Skip to content

Commit

Permalink
[FEAT] New Local Execution Model (#2437)
Browse files Browse the repository at this point in the history
Prototype for new local execution model

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Jul 9, 2024
1 parent c3d43cf commit 0bd1d27
Show file tree
Hide file tree
Showing 23 changed files with 757 additions and 14 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ daft-dsl = {path = "src/daft-dsl", default-features = false}
daft-execution = {path = "src/daft-execution", default-features = false}
daft-io = {path = "src/daft-io", default-features = false}
daft-json = {path = "src/daft-json", default-features = false}
daft-local-execution = {path = "src/daft-local-execution", default-features = false}
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
Expand All @@ -32,6 +33,7 @@ python = [
"daft-csv/python",
"daft-dsl/python",
"daft-execution/python",
"daft-local-execution/python",
"daft-io/python",
"daft-json/python",
"daft-micropartition/python",
Expand Down Expand Up @@ -97,6 +99,7 @@ members = [
"src/common/system-info",
"src/daft-core",
"src/daft-execution",
"src/daft-local-execution",
"src/daft-io",
"src/daft-parquet",
"src/daft-csv",
Expand All @@ -122,6 +125,7 @@ bytes = "1.6.0"
chrono = "0.4.38"
chrono-tz = "0.8.4"
comfy-table = "7.1.1"
dyn-clone = "1"
futures = "0.3.30"
html-escape = "0.2.13"
indexmap = "2.1.0"
Expand Down
19 changes: 13 additions & 6 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,20 @@ def run_iter(
# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config)
psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
del psets
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
results_gen = self._physical_plan_to_partitions(tasks)
if daft_execution_config.enable_native_executor:
logger.info("Using new executor")
results_gen = plan_scheduler.run(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
)
yield from results_gen
else:
psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
del psets
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
results_gen = self._physical_plan_to_partitions(tasks)
yield from results_gen

def run_iter_tables(
self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None
Expand Down
10 changes: 5 additions & 5 deletions src/daft-execution/src/stage/run.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::{collections::HashMap, sync::Arc, thread::JoinHandle};

use common_error::DaftResult;
use daft_dsl::common_treenode::{self, TreeNode};
use daft_micropartition::MicroPartition;
use daft_plan::QueryStageOutput;

use super::{
planner::physical_plan_to_stage,
runner::{ExchangeStageRunner, SinkStageRunner},
};
use crate::{
executor::{
local::{
Expand All @@ -17,11 +22,6 @@ use crate::{
stage::Stage,
};

use super::{
planner::physical_plan_to_stage,
runner::{ExchangeStageRunner, SinkStageRunner},
};

/// Run a stage locally and synchronously, with all tasks executed serially.
pub fn run_local_sync(
query_stage: &QueryStageOutput,
Expand Down
25 changes: 25 additions & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[dependencies]
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-micropartition = {path = "../daft-micropartition", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
dyn-clone = {workspace = true}
futures = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python"]

[package]
edition = {workspace = true}
name = "daft-local-execution"
version = {workspace = true}
71 changes: 71 additions & 0 deletions src/daft-local-execution/src/create_pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::{collections::HashMap, sync::Arc};

use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
use daft_plan::{
physical_ops::{Aggregate, Filter, InMemoryScan, Limit, Project, TabularScan},
PhysicalPlan,
};

use crate::{
intermediate_ops::{filter::FilterOperator, project::ProjectOperator},
pipeline::Pipeline,
sinks::{aggregate::AggregateSink, limit::LimitSink},
sources::{in_memory::InMemorySource, scan_task::ScanTaskSource},
};

pub fn physical_plan_to_pipeline(
physical_plan: &Arc<PhysicalPlan>,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
) -> Pipeline {
match physical_plan.as_ref() {
PhysicalPlan::InMemoryScan(InMemoryScan { in_memory_info, .. }) => {
let partitions = psets
.get(&in_memory_info.cache_key)
.expect("Cache key not found");
Pipeline::new(Box::new(InMemorySource::new(partitions.clone())))
}
PhysicalPlan::TabularScan(TabularScan { scan_tasks, .. }) => {
Pipeline::new(Box::new(ScanTaskSource::new(scan_tasks.clone())))
}
PhysicalPlan::Project(Project {
input, projection, ..
}) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let proj_op = ProjectOperator::new(projection.clone());
current_pipeline.with_intermediate_operator(Box::new(proj_op))
}
PhysicalPlan::Filter(Filter { input, predicate }) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let filter_op = FilterOperator::new(predicate.clone());
current_pipeline.with_intermediate_operator(Box::new(filter_op))
}
PhysicalPlan::Limit(Limit { limit, input, .. }) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let sink = LimitSink::new(*limit as usize);
let current_pipeline = current_pipeline.with_sink(Box::new(sink));

Pipeline::new(Box::new(current_pipeline))
}
PhysicalPlan::Aggregate(Aggregate {
input,
aggregations,
groupby,
}) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let sink = AggregateSink::new(
aggregations
.iter()
.map(|agg| Arc::new(Expr::Agg(agg.clone())))
.collect::<Vec<_>>(),
groupby.clone(),
);
let current_pipeline = current_pipeline.with_sink(Box::new(sink));

Pipeline::new(Box::new(current_pipeline))
}
_ => {
unimplemented!("Physical plan not supported: {}", physical_plan.name());
}
}
}
30 changes: 30 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;

use super::intermediate_op::IntermediateOperator;

#[derive(Clone)]
pub struct FilterOperator {
predicate: ExprRef,
}

impl FilterOperator {
pub fn new(predicate: ExprRef) -> Self {
Self { predicate }
}
}

impl IntermediateOperator for FilterOperator {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>> {
log::debug!("FilterOperator::execute");
let out = input.filter(&[self.predicate.clone()])?;
Ok(Arc::new(out))
}

fn name(&self) -> String {
"FilterOperator".to_string()
}
}
11 changes: 11 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_micropartition::MicroPartition;

pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>>;
fn name(&self) -> String;
}

dyn_clone::clone_trait_object!(IntermediateOperator);
3 changes: 3 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod filter;
pub mod intermediate_op;
pub mod project;
30 changes: 30 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/project.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;

use super::intermediate_op::IntermediateOperator;

#[derive(Clone)]
pub struct ProjectOperator {
projection: Vec<ExprRef>,
}

impl ProjectOperator {
pub fn new(projection: Vec<ExprRef>) -> Self {
Self { projection }
}
}

impl IntermediateOperator for ProjectOperator {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>> {
log::debug!("ProjectOperator::execute");
let out = input.eval_expression_list(&self.projection)?;
Ok(Arc::new(out))
}

fn name(&self) -> String {
"ProjectOperator".to_string()
}
}
47 changes: 47 additions & 0 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
mod create_pipeline;
mod intermediate_ops;
mod pipeline;
mod run;
mod sinks;
mod sources;

use std::sync::Arc;

use common_error::{DaftError, DaftResult};
use daft_micropartition::MicroPartition;
pub use run::run_streaming;
use snafu::Snafu;

type Sender = tokio::sync::mpsc::Sender<DaftResult<Arc<MicroPartition>>>;
type Receiver = tokio::sync::mpsc::Receiver<DaftResult<Arc<MicroPartition>>>;

pub fn create_channel() -> (Sender, Receiver) {
tokio::sync::mpsc::channel(1)
}

#[cfg(feature = "python")]
use pyo3::prelude::*;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error joining spawned task: {}", source))]
JoinError { source: tokio::task::JoinError },
#[snafu(display(
"Sender of OneShot Channel Dropped before sending data over: {}",
source
))]
OneShotRecvError {
source: tokio::sync::oneshot::error::RecvError,
},
}

impl From<Error> for DaftError {
fn from(err: Error) -> DaftError {
DaftError::External(err.into())
}
}

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, _parent: &PyModule) -> PyResult<()> {
Ok(())
}
Loading

0 comments on commit 0bd1d27

Please sign in to comment.