diff --git a/Cargo.lock b/Cargo.lock index 9d24aa6a11..c189ea1df7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1881,6 +1881,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "daft-physical-plan" +version = "0.2.0-dev0" +dependencies = [ + "common-error", + "daft-core", + "daft-dsl", + "daft-plan", + "daft-scan", + "log", + "strum 0.26.2", +] + [[package]] name = "daft-plan" version = "0.2.0-dev0" @@ -4691,6 +4704,9 @@ name = "strum" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" +dependencies = [ + "strum_macros 0.26.2", +] [[package]] name = "strum_macros" diff --git a/Cargo.toml b/Cargo.toml index b93aaa3315..a123b3b454 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ members = [ "src/daft-dsl", "src/daft-table", "src/daft-plan", + "src/daft-physical-plan", "src/daft-micropartition", "src/daft-scan", "src/daft-scheduler", diff --git a/src/daft-physical-plan/Cargo.toml b/src/daft-physical-plan/Cargo.toml new file mode 100644 index 0000000000..02d8275e9f --- /dev/null +++ b/src/daft-physical-plan/Cargo.toml @@ -0,0 +1,13 @@ +[dependencies] +common-error = {path = "../common/error", default-features = false} +daft-core = {path = "../daft-core", default-features = false} +daft-dsl = {path = "../daft-dsl", default-features = false} +daft-plan = {path = "../daft-plan", default-features = false} +daft-scan = {path = "../daft-scan", default-features = false} +log = {workspace = true} +strum = {version = "0.26", features = ["derive"]} + +[package] +edition = {workspace = true} +name = "daft-physical-plan" +version = {workspace = true} diff --git a/src/daft-physical-plan/src/lib.rs b/src/daft-physical-plan/src/lib.rs new file mode 100644 index 0000000000..9b0a183fa2 --- /dev/null +++ b/src/daft-physical-plan/src/lib.rs @@ -0,0 +1,6 @@ +#[allow(unused)] +mod local_plan; +mod translate; + +pub use local_plan::{LocalPhysicalPlan, LocalPhysicalPlanRef}; +pub use translate::translate; diff --git a/src/daft-physical-plan/src/local_plan.rs b/src/daft-physical-plan/src/local_plan.rs new file mode 100644 index 0000000000..b7e3fcf956 --- /dev/null +++ b/src/daft-physical-plan/src/local_plan.rs @@ -0,0 +1,224 @@ +use std::sync::Arc; + +use daft_core::schema::SchemaRef; +use daft_dsl::{AggExpr, ExprRef}; +use daft_plan::{InMemoryInfo, ResourceRequest}; +use daft_scan::{ScanTask, ScanTaskRef}; + +pub type LocalPhysicalPlanRef = Arc; +#[derive(Debug, strum::IntoStaticStr)] +pub enum LocalPhysicalPlan { + InMemoryScan(InMemoryScan), + PhysicalScan(PhysicalScan), + // EmptyScan(EmptyScan), + Project(Project), + Filter(Filter), + Limit(Limit), + // Explode(Explode), + // Unpivot(Unpivot), + Sort(Sort), + // Split(Split), + // Sample(Sample), + // MonotonicallyIncreasingId(MonotonicallyIncreasingId), + // Coalesce(Coalesce), + // Flatten(Flatten), + // FanoutRandom(FanoutRandom), + // FanoutByHash(FanoutByHash), + // #[allow(dead_code)] + // FanoutByRange(FanoutByRange), + // ReduceMerge(ReduceMerge), + UnGroupedAggregate(UnGroupedAggregate), + HashAggregate(HashAggregate), + // Pivot(Pivot), + // Concat(Concat), + HashJoin(HashJoin), + // SortMergeJoin(SortMergeJoin), + // BroadcastJoin(BroadcastJoin), + PhysicalWrite(PhysicalWrite), + // TabularWriteJson(TabularWriteJson), + // TabularWriteCsv(TabularWriteCsv), + // #[cfg(feature = "python")] + // IcebergWrite(IcebergWrite), + // #[cfg(feature = "python")] + // DeltaLakeWrite(DeltaLakeWrite), + // #[cfg(feature = "python")] + // LanceWrite(LanceWrite), +} + +impl LocalPhysicalPlan { + pub fn name(&self) -> &'static str { + // uses strum::IntoStaticStr + self.into() + } + + pub fn arced(self) -> LocalPhysicalPlanRef { + self.into() + } + + pub(crate) fn in_memory_scan(in_memory_info: InMemoryInfo) -> LocalPhysicalPlanRef { + LocalPhysicalPlan::InMemoryScan(InMemoryScan { + info: in_memory_info, + plan_stats: PlanStats {}, + }) + .arced() + } + + pub(crate) fn physical_scan( + scan_tasks: Vec, + schema: SchemaRef, + ) -> LocalPhysicalPlanRef { + LocalPhysicalPlan::PhysicalScan(PhysicalScan { + scan_tasks, + schema, + plan_stats: PlanStats {}, + }) + .arced() + } + + pub(crate) fn filter(input: LocalPhysicalPlanRef, predicate: ExprRef) -> LocalPhysicalPlanRef { + let schema = input.schema().clone(); + LocalPhysicalPlan::Filter(Filter { + input, + predicate, + schema, + plan_stats: PlanStats {}, + }) + .arced() + } + + pub(crate) fn limit(input: LocalPhysicalPlanRef, num_rows: i64) -> LocalPhysicalPlanRef { + let schema = input.schema().clone(); + LocalPhysicalPlan::Limit(Limit { + input, + num_rows, + schema, + plan_stats: PlanStats {}, + }) + .arced() + } + + pub(crate) fn project( + input: LocalPhysicalPlanRef, + projection: Vec, + resource_request: ResourceRequest, + schema: SchemaRef, + ) -> LocalPhysicalPlanRef { + LocalPhysicalPlan::Project(Project { + input, + projection, + resource_request, + schema, + plan_stats: PlanStats {}, + }) + .arced() + } + + pub(crate) fn ungrouped_aggregate( + input: LocalPhysicalPlanRef, + aggregations: Vec, + schema: SchemaRef, + ) -> LocalPhysicalPlanRef { + LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { + input, + aggregations, + schema, + plan_stats: PlanStats {}, + }) + .arced() + } + + pub(crate) fn hash_aggregate( + input: LocalPhysicalPlanRef, + aggregations: Vec, + group_by: Vec, + schema: SchemaRef, + ) -> LocalPhysicalPlanRef { + LocalPhysicalPlan::HashAggregate(HashAggregate { + input, + aggregations, + group_by, + schema, + plan_stats: PlanStats {}, + }) + .arced() + } + pub(crate) fn schema(&self) -> &SchemaRef { + match self { + LocalPhysicalPlan::PhysicalScan(PhysicalScan { schema, .. }) + | LocalPhysicalPlan::Filter(Filter { schema, .. }) + | LocalPhysicalPlan::Limit(Limit { schema, .. }) + | LocalPhysicalPlan::Project(Project { schema, .. }) + | LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { schema, .. }) + | LocalPhysicalPlan::HashAggregate(HashAggregate { schema, .. }) => schema, + _ => todo!("{:?}", self), + } + } +} + +#[derive(Debug)] + +pub struct InMemoryScan { + info: InMemoryInfo, + plan_stats: PlanStats, +} +#[derive(Debug)] + +pub struct PhysicalScan { + scan_tasks: Vec, + schema: SchemaRef, + plan_stats: PlanStats, +} +#[derive(Debug)] + +pub struct Project { + input: LocalPhysicalPlanRef, + projection: Vec, + resource_request: ResourceRequest, + schema: SchemaRef, + plan_stats: PlanStats, +} +#[derive(Debug)] + +pub struct Filter { + input: LocalPhysicalPlanRef, + predicate: ExprRef, + schema: SchemaRef, + plan_stats: PlanStats, +} +#[derive(Debug)] + +pub struct Limit { + input: LocalPhysicalPlanRef, + num_rows: i64, + schema: SchemaRef, + plan_stats: PlanStats, +} +#[derive(Debug)] + +pub struct Sort {} +#[derive(Debug)] + +pub struct UnGroupedAggregate { + input: LocalPhysicalPlanRef, + aggregations: Vec, + schema: SchemaRef, + plan_stats: PlanStats, +} +#[derive(Debug)] + +pub struct HashAggregate { + input: LocalPhysicalPlanRef, + aggregations: Vec, + group_by: Vec, + schema: SchemaRef, + plan_stats: PlanStats, +} +#[derive(Debug)] + +pub struct HashJoin {} +#[derive(Debug)] + +pub struct PhysicalWrite {} +#[derive(Debug)] + +pub struct PlanStats {} diff --git a/src/daft-physical-plan/src/translate.rs b/src/daft-physical-plan/src/translate.rs new file mode 100644 index 0000000000..786aefe02b --- /dev/null +++ b/src/daft-physical-plan/src/translate.rs @@ -0,0 +1,61 @@ +use common_error::DaftResult; +use daft_plan::{LogicalPlan, LogicalPlanRef, SourceInfo}; + +use crate::local_plan::{LocalPhysicalPlan, LocalPhysicalPlanRef}; + +pub fn translate(plan: &LogicalPlanRef) -> DaftResult { + match plan.as_ref() { + LogicalPlan::Source(source) => { + match source.source_info.as_ref() { + SourceInfo::InMemory(info) => Ok(LocalPhysicalPlan::in_memory_scan(info.clone())), + SourceInfo::Physical(info) => { + // We should be able to pass the ScanOperator into the physical plan directly but we need to figure out the serialization story + let scan_tasks_iter = info.scan_op.0.to_scan_tasks(info.pushdowns.clone())?; + let scan_tasks = scan_tasks_iter.collect::>>()?; + Ok(LocalPhysicalPlan::physical_scan( + scan_tasks, + info.source_schema.clone(), + )) + } + SourceInfo::PlaceHolder(_) => { + panic!("We should not encounter a PlaceHolder during translation") + } + } + } + LogicalPlan::Filter(filter) => { + let input = translate(&filter.input)?; + Ok(LocalPhysicalPlan::filter(input, filter.predicate.clone())) + } + LogicalPlan::Limit(limit) => { + let input = translate(&limit.input)?; + Ok(LocalPhysicalPlan::limit(input, limit.limit)) + } + LogicalPlan::Project(project) => { + let input = translate(&project.input)?; + Ok(LocalPhysicalPlan::project( + input, + project.projection.clone(), + project.resource_request.clone(), + project.projected_schema.clone(), + )) + } + LogicalPlan::Aggregate(aggregate) => { + let input = translate(&aggregate.input)?; + if aggregate.groupby.is_empty() { + Ok(LocalPhysicalPlan::ungrouped_aggregate( + input, + aggregate.aggregations.clone(), + aggregate.output_schema.clone(), + )) + } else { + Ok(LocalPhysicalPlan::hash_aggregate( + input, + aggregate.aggregations.clone(), + aggregate.groupby.clone(), + aggregate.output_schema.clone(), + )) + } + } + _ => todo!("{} not yet implemented", plan.name()), + } +} diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 5a255013b9..de51fd2f2e 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -21,7 +21,7 @@ use daft_core::{ schema::{Schema, SchemaRef}, }; use daft_dsl::{col, ExprRef}; -use daft_scan::{file_format::FileFormat, Pushdowns, ScanExternalInfo, ScanOperatorRef}; +use daft_scan::{file_format::FileFormat, PhysicalScanInfo, Pushdowns, ScanOperatorRef}; #[cfg(feature = "python")] use { @@ -80,7 +80,7 @@ impl LogicalPlanBuilder { ) -> DaftResult { let schema = scan_operator.0.schema(); let partitioning_keys = scan_operator.0.partitioning_keys(); - let source_info = SourceInfo::External(ScanExternalInfo::new( + let source_info = SourceInfo::Physical(PhysicalScanInfo::new( scan_operator.clone(), schema.clone(), partitioning_keys.into(), diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index d655d08a87..a21873a5ea 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -30,7 +30,7 @@ pub use physical_planner::{ }; pub use resource_request::ResourceRequest; pub use sink_info::OutputFileInfo; -pub use source_info::{FileInfo, FileInfos, InMemoryInfo}; +pub use source_info::{FileInfo, FileInfos, InMemoryInfo, SourceInfo}; #[cfg(feature = "python")] use pyo3::prelude::*; diff --git a/src/daft-plan/src/logical_ops/source.rs b/src/daft-plan/src/logical_ops/source.rs index fc21db7de0..0b1e652bce 100644 --- a/src/daft-plan/src/logical_ops/source.rs +++ b/src/daft-plan/src/logical_ops/source.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use daft_core::schema::SchemaRef; -use daft_scan::ScanExternalInfo; +use daft_scan::PhysicalScanInfo; use crate::source_info::SourceInfo; @@ -30,7 +30,7 @@ impl Source { let mut res = vec![]; match self.source_info.as_ref() { - SourceInfo::External(ScanExternalInfo { + SourceInfo::Physical(PhysicalScanInfo { source_schema, scan_op, partitioning_keys, diff --git a/src/daft-plan/src/logical_optimization/rules/push_down_filter.rs b/src/daft-plan/src/logical_optimization/rules/push_down_filter.rs index c67d8a2751..b741a7247c 100644 --- a/src/daft-plan/src/logical_optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/logical_optimization/rules/push_down_filter.rs @@ -76,14 +76,14 @@ impl OptimizerRule for PushDownFilter { // Filter pushdown is not supported for in-memory sources. SourceInfo::InMemory(_) => return Ok(Transformed::No(plan)), // Do not pushdown if Source node already has a limit - SourceInfo::External(external_info) + SourceInfo::Physical(external_info) if let Some(_) = external_info.pushdowns.limit => { return Ok(Transformed::No(plan)) } // Pushdown filter into the Source node - SourceInfo::External(external_info) => { + SourceInfo::Physical(external_info) => { let predicate = &filter.predicate; let new_predicate = external_info .pushdowns @@ -144,7 +144,7 @@ impl OptimizerRule for PushDownFilter { let new_external_info = external_info.with_pushdowns(new_pushdowns); let new_source: LogicalPlan = Source::new( source.output_schema.clone(), - SourceInfo::External(new_external_info).into(), + SourceInfo::Physical(new_external_info).into(), ) .into(); if !needing_filter_op.is_empty() { diff --git a/src/daft-plan/src/logical_optimization/rules/push_down_limit.rs b/src/daft-plan/src/logical_optimization/rules/push_down_limit.rs index dc92754d1f..1da5a02c39 100644 --- a/src/daft-plan/src/logical_optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/logical_optimization/rules/push_down_limit.rs @@ -53,19 +53,19 @@ impl OptimizerRule for PushDownLimit { // Limit pushdown is not supported for in-memory sources. SourceInfo::InMemory(_) => Ok(Transformed::No(plan)), // Do not pushdown if Source node is already more limited than `limit` - SourceInfo::External(external_info) + SourceInfo::Physical(external_info) if let Some(existing_limit) = external_info.pushdowns.limit && existing_limit <= limit => { Ok(Transformed::No(plan)) } // Pushdown limit into the Source node as a "local" limit - SourceInfo::External(external_info) => { + SourceInfo::Physical(external_info) => { let new_pushdowns = external_info.pushdowns.with_limit(Some(limit)); let new_external_info = external_info.with_pushdowns(new_pushdowns); let new_source = LogicalPlan::Source(Source::new( source.output_schema.clone(), - SourceInfo::External(new_external_info).into(), + SourceInfo::Physical(new_external_info).into(), )) .into(); let out_plan = if external_info.scan_op.0.can_absorb_limit() { diff --git a/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs b/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs index 11285e69b5..3b5bc51c23 100644 --- a/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs @@ -145,7 +145,7 @@ impl PushDownProjection { panic!() }; match source.source_info.as_ref() { - SourceInfo::External(external_info) => { + SourceInfo::Physical(external_info) => { if required_columns.len() < upstream_schema.names().len() { let pruned_upstream_schema = upstream_schema .fields @@ -156,7 +156,7 @@ impl PushDownProjection { let schema = Schema::new(pruned_upstream_schema)?; let new_source: LogicalPlan = Source::new( schema.into(), - Arc::new(SourceInfo::External(external_info.with_pushdowns( + Arc::new(SourceInfo::Physical(external_info.with_pushdowns( external_info.pushdowns.with_columns(Some(Arc::new( required_columns.iter().cloned().collect(), ))), diff --git a/src/daft-plan/src/physical_planner/translate.rs b/src/daft-plan/src/physical_planner/translate.rs index c4280480b9..698d384154 100644 --- a/src/daft-plan/src/physical_planner/translate.rs +++ b/src/daft-plan/src/physical_planner/translate.rs @@ -15,7 +15,7 @@ use daft_core::DataType; use daft_dsl::ExprRef; use daft_dsl::{col, ApproxPercentileParams}; -use daft_scan::ScanExternalInfo; +use daft_scan::PhysicalScanInfo; use crate::logical_ops::{ Aggregate as LogicalAggregate, Distinct as LogicalDistinct, Explode as LogicalExplode, @@ -41,7 +41,7 @@ pub(super) fn translate_single_logical_node( ) -> DaftResult { match logical_plan { LogicalPlan::Source(Source { source_info, .. }) => match source_info.as_ref() { - SourceInfo::External(ScanExternalInfo { + SourceInfo::Physical(PhysicalScanInfo { pushdowns, scan_op, source_schema, diff --git a/src/daft-plan/src/source_info/mod.rs b/src/daft-plan/src/source_info/mod.rs index 49c701b6ae..f1b31e5ab3 100644 --- a/src/daft-plan/src/source_info/mod.rs +++ b/src/daft-plan/src/source_info/mod.rs @@ -1,6 +1,6 @@ pub mod file_info; use daft_core::schema::SchemaRef; -use daft_scan::ScanExternalInfo; +use daft_scan::PhysicalScanInfo; pub use file_info::{FileInfo, FileInfos}; use serde::{Deserialize, Serialize}; use std::hash::Hash; @@ -18,7 +18,7 @@ use { #[derive(Debug, PartialEq, Eq, Hash)] pub enum SourceInfo { InMemory(InMemoryInfo), - External(ScanExternalInfo), + Physical(PhysicalScanInfo), PlaceHolder(PlaceHolderInfo), } diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 7f8f33a4bc..a101417706 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -716,14 +716,14 @@ impl Display for ScanOperatorRef { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ScanExternalInfo { +pub struct PhysicalScanInfo { pub scan_op: ScanOperatorRef, pub source_schema: SchemaRef, pub partitioning_keys: Vec, pub pushdowns: Pushdowns, } -impl ScanExternalInfo { +impl PhysicalScanInfo { pub fn new( scan_op: ScanOperatorRef, source_schema: SchemaRef,