Skip to content

Commit

Permalink
[PERF] Local Execution Plan (#2489)
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 authored Jul 9, 2024
1 parent 0bd1d27 commit ecebb82
Show file tree
Hide file tree
Showing 15 changed files with 340 additions and 19 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ members = [
"src/daft-dsl",
"src/daft-table",
"src/daft-plan",
"src/daft-physical-plan",
"src/daft-micropartition",
"src/daft-scan",
"src/daft-scheduler",
Expand Down
13 changes: 13 additions & 0 deletions src/daft-physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[dependencies]
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
log = {workspace = true}
strum = {version = "0.26", features = ["derive"]}

[package]
edition = {workspace = true}
name = "daft-physical-plan"
version = {workspace = true}
6 changes: 6 additions & 0 deletions src/daft-physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#[allow(unused)]
mod local_plan;
mod translate;

pub use local_plan::{LocalPhysicalPlan, LocalPhysicalPlanRef};
pub use translate::translate;
224 changes: 224 additions & 0 deletions src/daft-physical-plan/src/local_plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
use std::sync::Arc;

use daft_core::schema::SchemaRef;
use daft_dsl::{AggExpr, ExprRef};
use daft_plan::{InMemoryInfo, ResourceRequest};
use daft_scan::{ScanTask, ScanTaskRef};

pub type LocalPhysicalPlanRef = Arc<LocalPhysicalPlan>;
#[derive(Debug, strum::IntoStaticStr)]
pub enum LocalPhysicalPlan {
InMemoryScan(InMemoryScan),
PhysicalScan(PhysicalScan),
// EmptyScan(EmptyScan),
Project(Project),
Filter(Filter),
Limit(Limit),
// Explode(Explode),
// Unpivot(Unpivot),
Sort(Sort),
// Split(Split),
// Sample(Sample),
// MonotonicallyIncreasingId(MonotonicallyIncreasingId),
// Coalesce(Coalesce),
// Flatten(Flatten),
// FanoutRandom(FanoutRandom),
// FanoutByHash(FanoutByHash),
// #[allow(dead_code)]
// FanoutByRange(FanoutByRange),
// ReduceMerge(ReduceMerge),
UnGroupedAggregate(UnGroupedAggregate),
HashAggregate(HashAggregate),
// Pivot(Pivot),
// Concat(Concat),
HashJoin(HashJoin),
// SortMergeJoin(SortMergeJoin),
// BroadcastJoin(BroadcastJoin),
PhysicalWrite(PhysicalWrite),
// TabularWriteJson(TabularWriteJson),
// TabularWriteCsv(TabularWriteCsv),
// #[cfg(feature = "python")]
// IcebergWrite(IcebergWrite),
// #[cfg(feature = "python")]
// DeltaLakeWrite(DeltaLakeWrite),
// #[cfg(feature = "python")]
// LanceWrite(LanceWrite),
}

impl LocalPhysicalPlan {
pub fn name(&self) -> &'static str {
// uses strum::IntoStaticStr
self.into()
}

pub fn arced(self) -> LocalPhysicalPlanRef {
self.into()
}

pub(crate) fn in_memory_scan(in_memory_info: InMemoryInfo) -> LocalPhysicalPlanRef {
LocalPhysicalPlan::InMemoryScan(InMemoryScan {
info: in_memory_info,
plan_stats: PlanStats {},
})
.arced()
}

pub(crate) fn physical_scan(
scan_tasks: Vec<ScanTaskRef>,
schema: SchemaRef,
) -> LocalPhysicalPlanRef {
LocalPhysicalPlan::PhysicalScan(PhysicalScan {
scan_tasks,
schema,
plan_stats: PlanStats {},
})
.arced()
}

pub(crate) fn filter(input: LocalPhysicalPlanRef, predicate: ExprRef) -> LocalPhysicalPlanRef {
let schema = input.schema().clone();
LocalPhysicalPlan::Filter(Filter {
input,
predicate,
schema,
plan_stats: PlanStats {},
})
.arced()
}

pub(crate) fn limit(input: LocalPhysicalPlanRef, num_rows: i64) -> LocalPhysicalPlanRef {
let schema = input.schema().clone();
LocalPhysicalPlan::Limit(Limit {
input,
num_rows,
schema,
plan_stats: PlanStats {},
})
.arced()
}

pub(crate) fn project(
input: LocalPhysicalPlanRef,
projection: Vec<ExprRef>,
resource_request: ResourceRequest,
schema: SchemaRef,
) -> LocalPhysicalPlanRef {
LocalPhysicalPlan::Project(Project {
input,
projection,
resource_request,
schema,
plan_stats: PlanStats {},
})
.arced()
}

pub(crate) fn ungrouped_aggregate(
input: LocalPhysicalPlanRef,
aggregations: Vec<AggExpr>,
schema: SchemaRef,
) -> LocalPhysicalPlanRef {
LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate {
input,
aggregations,
schema,
plan_stats: PlanStats {},
})
.arced()
}

pub(crate) fn hash_aggregate(
input: LocalPhysicalPlanRef,
aggregations: Vec<AggExpr>,
group_by: Vec<ExprRef>,
schema: SchemaRef,
) -> LocalPhysicalPlanRef {
LocalPhysicalPlan::HashAggregate(HashAggregate {
input,
aggregations,
group_by,
schema,
plan_stats: PlanStats {},
})
.arced()
}
pub(crate) fn schema(&self) -> &SchemaRef {
match self {
LocalPhysicalPlan::PhysicalScan(PhysicalScan { schema, .. })
| LocalPhysicalPlan::Filter(Filter { schema, .. })
| LocalPhysicalPlan::Limit(Limit { schema, .. })
| LocalPhysicalPlan::Project(Project { schema, .. })
| LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { schema, .. })
| LocalPhysicalPlan::HashAggregate(HashAggregate { schema, .. }) => schema,
_ => todo!("{:?}", self),
}
}
}

#[derive(Debug)]

pub struct InMemoryScan {
info: InMemoryInfo,
plan_stats: PlanStats,
}
#[derive(Debug)]

pub struct PhysicalScan {
scan_tasks: Vec<ScanTaskRef>,
schema: SchemaRef,
plan_stats: PlanStats,
}
#[derive(Debug)]

pub struct Project {
input: LocalPhysicalPlanRef,
projection: Vec<ExprRef>,
resource_request: ResourceRequest,
schema: SchemaRef,
plan_stats: PlanStats,
}
#[derive(Debug)]

pub struct Filter {
input: LocalPhysicalPlanRef,
predicate: ExprRef,
schema: SchemaRef,
plan_stats: PlanStats,
}
#[derive(Debug)]

pub struct Limit {
input: LocalPhysicalPlanRef,
num_rows: i64,
schema: SchemaRef,
plan_stats: PlanStats,
}
#[derive(Debug)]

pub struct Sort {}
#[derive(Debug)]

pub struct UnGroupedAggregate {
input: LocalPhysicalPlanRef,
aggregations: Vec<AggExpr>,
schema: SchemaRef,
plan_stats: PlanStats,
}
#[derive(Debug)]

pub struct HashAggregate {
input: LocalPhysicalPlanRef,
aggregations: Vec<AggExpr>,
group_by: Vec<ExprRef>,
schema: SchemaRef,
plan_stats: PlanStats,
}
#[derive(Debug)]

pub struct HashJoin {}
#[derive(Debug)]

pub struct PhysicalWrite {}
#[derive(Debug)]

pub struct PlanStats {}
61 changes: 61 additions & 0 deletions src/daft-physical-plan/src/translate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use common_error::DaftResult;
use daft_plan::{LogicalPlan, LogicalPlanRef, SourceInfo};

use crate::local_plan::{LocalPhysicalPlan, LocalPhysicalPlanRef};

pub fn translate(plan: &LogicalPlanRef) -> DaftResult<LocalPhysicalPlanRef> {
match plan.as_ref() {
LogicalPlan::Source(source) => {
match source.source_info.as_ref() {
SourceInfo::InMemory(info) => Ok(LocalPhysicalPlan::in_memory_scan(info.clone())),
SourceInfo::Physical(info) => {
// We should be able to pass the ScanOperator into the physical plan directly but we need to figure out the serialization story
let scan_tasks_iter = info.scan_op.0.to_scan_tasks(info.pushdowns.clone())?;
let scan_tasks = scan_tasks_iter.collect::<DaftResult<Vec<_>>>()?;
Ok(LocalPhysicalPlan::physical_scan(
scan_tasks,
info.source_schema.clone(),
))
}
SourceInfo::PlaceHolder(_) => {
panic!("We should not encounter a PlaceHolder during translation")
}
}
}
LogicalPlan::Filter(filter) => {
let input = translate(&filter.input)?;
Ok(LocalPhysicalPlan::filter(input, filter.predicate.clone()))
}
LogicalPlan::Limit(limit) => {
let input = translate(&limit.input)?;
Ok(LocalPhysicalPlan::limit(input, limit.limit))
}
LogicalPlan::Project(project) => {
let input = translate(&project.input)?;
Ok(LocalPhysicalPlan::project(
input,
project.projection.clone(),
project.resource_request.clone(),
project.projected_schema.clone(),
))
}
LogicalPlan::Aggregate(aggregate) => {
let input = translate(&aggregate.input)?;
if aggregate.groupby.is_empty() {
Ok(LocalPhysicalPlan::ungrouped_aggregate(
input,
aggregate.aggregations.clone(),
aggregate.output_schema.clone(),
))
} else {
Ok(LocalPhysicalPlan::hash_aggregate(
input,
aggregate.aggregations.clone(),
aggregate.groupby.clone(),
aggregate.output_schema.clone(),
))
}
}
_ => todo!("{} not yet implemented", plan.name()),
}
}
4 changes: 2 additions & 2 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use daft_core::{
schema::{Schema, SchemaRef},
};
use daft_dsl::{col, ExprRef};
use daft_scan::{file_format::FileFormat, Pushdowns, ScanExternalInfo, ScanOperatorRef};
use daft_scan::{file_format::FileFormat, PhysicalScanInfo, Pushdowns, ScanOperatorRef};

#[cfg(feature = "python")]
use {
Expand Down Expand Up @@ -80,7 +80,7 @@ impl LogicalPlanBuilder {
) -> DaftResult<Self> {
let schema = scan_operator.0.schema();
let partitioning_keys = scan_operator.0.partitioning_keys();
let source_info = SourceInfo::External(ScanExternalInfo::new(
let source_info = SourceInfo::Physical(PhysicalScanInfo::new(
scan_operator.clone(),
schema.clone(),
partitioning_keys.into(),
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use physical_planner::{
};
pub use resource_request::ResourceRequest;
pub use sink_info::OutputFileInfo;
pub use source_info::{FileInfo, FileInfos, InMemoryInfo};
pub use source_info::{FileInfo, FileInfos, InMemoryInfo, SourceInfo};

#[cfg(feature = "python")]
use pyo3::prelude::*;
Expand Down
4 changes: 2 additions & 2 deletions src/daft-plan/src/logical_ops/source.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use daft_core::schema::SchemaRef;
use daft_scan::ScanExternalInfo;
use daft_scan::PhysicalScanInfo;

use crate::source_info::SourceInfo;

Expand Down Expand Up @@ -30,7 +30,7 @@ impl Source {
let mut res = vec![];

match self.source_info.as_ref() {
SourceInfo::External(ScanExternalInfo {
SourceInfo::Physical(PhysicalScanInfo {
source_schema,
scan_op,
partitioning_keys,
Expand Down
Loading

0 comments on commit ecebb82

Please sign in to comment.