Skip to content

Commit

Permalink
[FEAT] [New Executor] [2/N] daft-execution crate + proof-of-concept c…
Browse files Browse the repository at this point in the history
…ompute ops and partition reference + metadata model for new executor. (#2340)

This PR adds the `daft-execution` subcrate containing a set of
proof-of-concept local compute ops and the partition reference +
metadata model for the new executor.

Partial metadata machinery isn't yet implemented since no task scheduler
or exchange op has required it yet.

## TODOs

- [ ] Add unit tests for compute ops.
- [ ] Add doc strings for abstractions.
  • Loading branch information
clarkzinzow authored Jun 7, 2024
1 parent 021b103 commit 22ef02b
Show file tree
Hide file tree
Showing 21 changed files with 1,743 additions and 15 deletions.
31 changes: 29 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ daft-compression = {path = "src/daft-compression", default-features = false}
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
daft-dsl = {path = "src/daft-dsl", default-features = false}
daft-execution = {path = "src/daft-execution", default-features = false}
daft-io = {path = "src/daft-io", default-features = false}
daft-json = {path = "src/daft-json", default-features = false}
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
Expand All @@ -15,9 +16,11 @@ daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-stats = {path = "src/daft-stats", default-features = false}
daft-table = {path = "src/daft-table", default-features = false}
lazy_static = {workspace = true}
log = {workspace = true}
lzma-sys = {version = "*", features = ["static"]}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true, optional = true}
sysinfo = {workspace = true}

[features]
default = ["python"]
Expand All @@ -27,6 +30,7 @@ python = [
"daft-core/python",
"daft-csv/python",
"daft-dsl/python",
"daft-execution/python",
"daft-io/python",
"daft-json/python",
"daft-micropartition/python",
Expand Down Expand Up @@ -88,6 +92,7 @@ members = [
"src/common/daft-config",
"src/common/system-info",
"src/daft-core",
"src/daft-execution",
"src/daft-io",
"src/daft-parquet",
"src/daft-csv",
Expand All @@ -108,6 +113,7 @@ async-compression = {version = "0.4.10", features = [
"all-algorithms"
]}
async-stream = "0.3.5"
async-trait = "0.1.79"
bytes = "1.6.0"
chrono = "0.4.38"
chrono-tz = "0.8.4"
Expand All @@ -129,6 +135,7 @@ rstest = "0.18.2"
serde_json = "1.0.116"
sketches-ddsketch = {version = "0.2.2", features = ["use_serde"]}
snafu = {version = "0.7.4", features = ["futures"]}
sysinfo = "0.30.12"
tokio = {version = "1.37.0", features = [
"net",
"time",
Expand All @@ -139,7 +146,7 @@ tokio = {version = "1.37.0", features = [
"rt",
"rt-multi-thread"
]}
tokio-stream = {version = "0.1.14", features = ["fs"]}
tokio-stream = {version = "0.1.14", features = ["fs", "io-util", "time"]}
tokio-util = "0.7.11"
url = "2.4.0"

Expand Down
30 changes: 30 additions & 0 deletions src/daft-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[dependencies]
async-trait = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-micropartition = {path = "../daft-micropartition", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
futures = {workspace = true}
itertools = {workspace = true}
log = {workspace = true}
pyo3 = {workspace = true, optional = true}
rand = {workspace = true}
rayon = {workspace = true}
snafu = {workspace = true}
sysinfo = {workspace = true}
tokio = {workspace = true}

[dev-dependencies]
rstest = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python"]

[package]
edition = {workspace = true}
name = "daft-execution"
version = {workspace = true}
39 changes: 39 additions & 0 deletions src/daft-execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#![feature(impl_trait_in_assoc_type)]
#![feature(let_chains)]
#![feature(assert_matches)]
// TODO(Clark): Remove this once stage planner, partial metadata, etc. are implemented.
#![allow(dead_code)]
#![allow(unused)]

mod ops;
mod partition;

use common_error::DaftError;
use snafu::Snafu;

#[cfg(feature = "python")]
use pyo3::prelude::*;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error joining spawned task: {}", source))]
JoinError { source: tokio::task::JoinError },
#[snafu(display(
"Sender of OneShot Channel Dropped before sending data over: {}",
source
))]
OneShotRecvError {
source: tokio::sync::oneshot::error::RecvError,
},
}

impl From<Error> for DaftError {
fn from(err: Error) -> DaftError {
DaftError::External(err.into())
}
}

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
Ok(())
}
49 changes: 49 additions & 0 deletions src/daft-execution/src/ops/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use daft_plan::ResourceRequest;

use crate::partition::partition_ref::PartitionMetadata;

use super::PartitionTaskOp;

/// Filter task op, applying a predicate to its input.
#[derive(Debug)]
pub struct FilterOp {
predicate: Vec<ExprRef>,
resource_request: ResourceRequest,
}

impl FilterOp {
pub fn new(predicate: Vec<ExprRef>) -> Self {
Self {
predicate,
resource_request: ResourceRequest::default_cpu(),
}
}
}

impl PartitionTaskOp for FilterOp {
type Input = MicroPartition;

fn execute(&self, inputs: &[Arc<MicroPartition>]) -> DaftResult<Vec<Arc<MicroPartition>>> {
assert_eq!(inputs.len(), 1);
let input = inputs.iter().next().unwrap();
let out = input.filter(self.predicate.as_slice())?;
Ok(vec![Arc::new(out)])
}

fn resource_request(&self) -> &ResourceRequest {
&self.resource_request
}

fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata {
todo!()
}

fn name(&self) -> &str {
"FilterOp"
}
}
Loading

0 comments on commit 22ef02b

Please sign in to comment.