diff --git a/Cargo.lock b/Cargo.lock index e8121cdf0d..d7033dc1b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1535,6 +1535,7 @@ dependencies = [ "daft-core", "daft-csv", "daft-dsl", + "daft-execution", "daft-io", "daft-json", "daft-micropartition", @@ -1546,9 +1547,11 @@ dependencies = [ "daft-table", "lazy_static", "libc", + "log", "lzma-sys", "pyo3", "pyo3-log", + "sysinfo", "tikv-jemallocator", ] @@ -1670,6 +1673,30 @@ dependencies = [ "serde_json", ] +[[package]] +name = "daft-execution" +version = "0.2.0-dev0" +dependencies = [ + "async-trait", + "common-error", + "daft-core", + "daft-dsl", + "daft-io", + "daft-micropartition", + "daft-plan", + "daft-scan", + "futures", + "itertools 0.11.0", + "log", + "pyo3", + "rand 0.8.5", + "rayon", + "rstest", + "snafu", + "sysinfo", + "tokio", +] + [[package]] name = "daft-io" version = "0.2.0-dev0" @@ -4695,9 +4722,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.30.7" +version = "0.30.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c385888ef380a852a16209afc8cfad22795dd8873d69c9a14d2e2088f118d18" +checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae" dependencies = [ "cfg-if", "core-foundation-sys", diff --git a/Cargo.toml b/Cargo.toml index a1e19e0ccb..21c61ff6d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ daft-compression = {path = "src/daft-compression", default-features = false} daft-core = {path = "src/daft-core", default-features = false} daft-csv = {path = "src/daft-csv", default-features = false} daft-dsl = {path = "src/daft-dsl", default-features = false} +daft-execution = {path = "src/daft-execution", default-features = false} daft-io = {path = "src/daft-io", default-features = false} daft-json = {path = "src/daft-json", default-features = false} daft-micropartition = {path = "src/daft-micropartition", default-features = false} @@ -15,9 +16,11 @@ daft-scheduler = {path = "src/daft-scheduler", default-features = false} daft-stats = {path = "src/daft-stats", default-features = false} daft-table = {path = "src/daft-table", default-features = false} lazy_static = {workspace = true} +log = {workspace = true} lzma-sys = {version = "*", features = ["static"]} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true, optional = true} +sysinfo = {workspace = true} [features] default = ["python"] @@ -27,6 +30,7 @@ python = [ "daft-core/python", "daft-csv/python", "daft-dsl/python", + "daft-execution/python", "daft-io/python", "daft-json/python", "daft-micropartition/python", @@ -88,6 +92,7 @@ members = [ "src/common/daft-config", "src/common/system-info", "src/daft-core", + "src/daft-execution", "src/daft-io", "src/daft-parquet", "src/daft-csv", @@ -108,6 +113,7 @@ async-compression = {version = "0.4.10", features = [ "all-algorithms" ]} async-stream = "0.3.5" +async-trait = "0.1.79" bytes = "1.6.0" chrono = "0.4.38" chrono-tz = "0.8.4" @@ -129,6 +135,7 @@ rstest = "0.18.2" serde_json = "1.0.116" sketches-ddsketch = {version = "0.2.2", features = ["use_serde"]} snafu = {version = "0.7.4", features = ["futures"]} +sysinfo = "0.30.12" tokio = {version = "1.37.0", features = [ "net", "time", @@ -139,7 +146,7 @@ tokio = {version = "1.37.0", features = [ "rt", "rt-multi-thread" ]} -tokio-stream = {version = "0.1.14", features = ["fs"]} +tokio-stream = {version = "0.1.14", features = ["fs", "io-util", "time"]} tokio-util = "0.7.11" url = "2.4.0" diff --git a/src/daft-execution/Cargo.toml b/src/daft-execution/Cargo.toml new file mode 100644 index 0000000000..7d0f0dd95b --- /dev/null +++ b/src/daft-execution/Cargo.toml @@ -0,0 +1,30 @@ +[dependencies] +async-trait = {workspace = true} +common-error = {path = "../common/error", default-features = false} +daft-core = {path = "../daft-core", default-features = false} +daft-dsl = {path = "../daft-dsl", default-features = false} +daft-io = {path = "../daft-io", default-features = false} +daft-micropartition = {path = "../daft-micropartition", default-features = false} +daft-plan = {path = "../daft-plan", default-features = false} +daft-scan = {path = "../daft-scan", default-features = false} +futures = {workspace = true} +itertools = {workspace = true} +log = {workspace = true} +pyo3 = {workspace = true, optional = true} +rand = {workspace = true} +rayon = {workspace = true} +snafu = {workspace = true} +sysinfo = {workspace = true} +tokio = {workspace = true} + +[dev-dependencies] +rstest = {workspace = true} + +[features] +default = ["python"] +python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python"] + +[package] +edition = {workspace = true} +name = "daft-execution" +version = {workspace = true} diff --git a/src/daft-execution/src/lib.rs b/src/daft-execution/src/lib.rs new file mode 100644 index 0000000000..e80197d398 --- /dev/null +++ b/src/daft-execution/src/lib.rs @@ -0,0 +1,39 @@ +#![feature(impl_trait_in_assoc_type)] +#![feature(let_chains)] +#![feature(assert_matches)] +// TODO(Clark): Remove this once stage planner, partial metadata, etc. are implemented. +#![allow(dead_code)] +#![allow(unused)] + +mod ops; +mod partition; + +use common_error::DaftError; +use snafu::Snafu; + +#[cfg(feature = "python")] +use pyo3::prelude::*; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error joining spawned task: {}", source))] + JoinError { source: tokio::task::JoinError }, + #[snafu(display( + "Sender of OneShot Channel Dropped before sending data over: {}", + source + ))] + OneShotRecvError { + source: tokio::sync::oneshot::error::RecvError, + }, +} + +impl From for DaftError { + fn from(err: Error) -> DaftError { + DaftError::External(err.into()) + } +} + +#[cfg(feature = "python")] +pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { + Ok(()) +} diff --git a/src/daft-execution/src/ops/filter.rs b/src/daft-execution/src/ops/filter.rs new file mode 100644 index 0000000000..c9104e49ba --- /dev/null +++ b/src/daft-execution/src/ops/filter.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_dsl::ExprRef; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +/// Filter task op, applying a predicate to its input. +#[derive(Debug)] +pub struct FilterOp { + predicate: Vec, + resource_request: ResourceRequest, +} + +impl FilterOp { + pub fn new(predicate: Vec) -> Self { + Self { + predicate, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for FilterOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert_eq!(inputs.len(), 1); + let input = inputs.iter().next().unwrap(); + let out = input.filter(self.predicate.as_slice())?; + Ok(vec![Arc::new(out)]) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "FilterOp" + } +} diff --git a/src/daft-execution/src/ops/fused.rs b/src/daft-execution/src/ops/fused.rs new file mode 100644 index 0000000000..32b766340a --- /dev/null +++ b/src/daft-execution/src/ops/fused.rs @@ -0,0 +1,664 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; +use itertools::Itertools; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +/// Builder for fused task ops. Task ops are only fused if they have compatible resource requests, meaning that their +/// resource requests or homogeneous enough that we don't want to pipeline execution of each task op with the other. +#[derive(Debug)] +pub struct FusedOpBuilder { + // Task op at the front of the chain. + source_op: Arc>, + // All task ops that have been fused into chain after source op. + // The order of this vec indicates the order of chain invocation. + fused_ops: Vec>>, + // Aggregate resource request for fused task op. + resource_request: ResourceRequest, +} + +impl Clone for FusedOpBuilder { + fn clone(&self) -> Self { + Self { + source_op: self.source_op.clone(), + fused_ops: self.fused_ops.clone(), + resource_request: self.resource_request.clone(), + } + } +} + +impl FusedOpBuilder { + pub fn new(source_op: Arc>) -> Self { + let resource_request = source_op.resource_request().clone(); + Self { + source_op, + fused_ops: vec![], + resource_request, + } + } + + pub fn add_op(&mut self, op: Arc>) { + assert!(self.can_add_op(op.as_ref())); + self.resource_request = self.resource_request.max(op.resource_request()); + self.fused_ops.push(op); + } + + pub fn can_add_op(&self, op: &dyn PartitionTaskOp) -> bool { + self.resource_request + .is_pipeline_compatible_with(op.resource_request()) + } + + pub fn build(self) -> Arc> { + if self.fused_ops.is_empty() { + self.source_op + } else { + Arc::new(FusedPartitionTaskOp::::new( + self.source_op, + self.fused_ops, + self.resource_request, + )) + } + } +} + +/// Finalized fused task op, containing two or more underlying task ops that will be invoked in a chain. +#[derive(Debug)] +pub struct FusedPartitionTaskOp { + // Task op at the front of the chain. + source_op: Arc>, + // All task ops that have been fused into chain after source op. + // The order of this vec indicates the order of chain invocation. + fused_ops: Vec>>, + // Aggregate resource request for fused task op; this is incrementally built via the above builder. + resource_request: ResourceRequest, + // A human-readable name for the fused task op, currently a concatenation of the fused task ops with a hyphen + // separator, e.g. ScanOp-ProjectOp-FilterOp. + name: String, +} + +impl FusedPartitionTaskOp { + pub fn new( + source_op: Arc>, + fused_ops: Vec>>, + resource_request: ResourceRequest, + ) -> Self { + // Concatenate all fused task ops with a hyphen separator. + let name = std::iter::once(source_op.name()) + .chain(fused_ops.iter().map(|op| op.name())) + .join("-"); + Self { + source_op, + fused_ops, + resource_request, + name, + } + } +} + +impl PartitionTaskOp for FusedPartitionTaskOp { + type Input = T; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + // Execute task ops in a chain. + let mut inputs = self.source_op.execute(inputs)?; + for op in self.fused_ops.iter() { + inputs = op.execute(&inputs)?; + } + Ok(inputs) + } + + fn num_inputs(&self) -> usize { + self.source_op.num_inputs() + } + + fn num_outputs(&self) -> usize { + // Number of outputs is dictated by the last task op in the fused chain. + self.fused_ops + .last() + .map_or_else(|| self.source_op.num_outputs(), |op| op.num_outputs()) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn resource_request_with_input_metadata( + &self, + input_meta: &[PartitionMetadata], + ) -> ResourceRequest { + // TODO(Clark): This should eventually take the max of the heap memory estimate for all fused ops in the chain, + // using max output size estimates from previous ops in the chain. This would require looping in + // the approximate stats estimate logic that's currently tied to the physical plan, which we previously talked + // about factoring out. + self.resource_request + .or_memory_bytes(input_meta.iter().map(|m| m.size_bytes).sum()) + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + fn with_input_metadata(&self, input_meta: &[PartitionMetadata]) { + // TODO(Clark): This should be applied to every task op in the chain, using partial metadata propagation for the + // intermediate task ops. For now, we can insure that any task op requiring stateful configuration maintenance + // based on input metadata isn't fused with upstream task ops. + self.source_op.with_input_metadata(input_meta); + } + fn with_previous_output_metadata(&self, output_meta: &[PartitionMetadata]) { + // TODO(Clark): This can't be applied to every task op in the chain, since we would need to keep track of + // output metadata for intermediate ops in the chain. For now, we limit stateful configuration maintenance to + // task ops at the end of a fused chain, and should prevent fusion for such task ops otherwise. + self.fused_ops.last().map_or_else( + || self.source_op.with_previous_output_metadata(output_meta), + |op| op.with_previous_output_metadata(output_meta), + ); + } + + fn name(&self) -> &str { + &self.name + } +} + +#[cfg(test)] +mod tests { + use std::{ + assert_matches::assert_matches, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + }; + + use common_error::DaftResult; + use daft_micropartition::MicroPartition; + use daft_plan::ResourceRequest; + + use crate::{ops::PartitionTaskOp, partition::partition_ref::PartitionMetadata}; + + use super::FusedOpBuilder; + + #[derive(Debug)] + struct MockExecOp { + exec_log: Arc>>, + resource_request: ResourceRequest, + name: String, + } + + impl MockExecOp { + fn new(name: impl Into, exec_log: Arc>>) -> Self { + Self { + exec_log, + resource_request: Default::default(), + name: name.into(), + } + } + } + + impl PartitionTaskOp for MockExecOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + self.exec_log.lock().unwrap().push(self.name.clone()); + Ok(inputs.to_vec()) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata( + &self, + input_meta: &[crate::partition::partition_ref::PartitionMetadata], + ) -> crate::partition::partition_ref::PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + &self.name + } + } + + /// Tests execution order of fused ops. + #[test] + fn exec_order() -> DaftResult<()> { + let exec_log = Arc::new(Mutex::new(vec![])); + let op1 = Arc::new(MockExecOp::new("op1", exec_log.clone())); + let op2 = Arc::new(MockExecOp::new("op2", exec_log.clone())); + let op3 = Arc::new(MockExecOp::new("op3", exec_log.clone())); + + let mut builder = FusedOpBuilder::new(op1.clone()); + assert!(builder.can_add_op(op2.as_ref())); + builder.add_op(op2.clone()); + assert!(builder.can_add_op(op3.as_ref())); + builder.add_op(op3.clone()); + let fused = builder.build(); + // Upon execution, fused ops should be called in the order in which they were added to the builder. + let inputs = vec![]; + fused.execute(&inputs); + assert_eq!(exec_log.lock().unwrap().clone(), vec!["op1", "op2", "op3"]); + assert_eq!(fused.name(), "op1-op2-op3"); + + Ok(()) + } + + #[derive(Debug)] + struct MockResourceOp { + resource_request: ResourceRequest, + name: String, + } + + impl MockResourceOp { + fn new( + name: impl Into, + num_cpus: Option, + num_gpus: Option, + memory_bytes: Option, + ) -> Self { + Self { + resource_request: ResourceRequest::new_internal(num_cpus, num_gpus, memory_bytes), + name: name.into(), + } + } + } + + impl PartitionTaskOp for MockResourceOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + Ok(inputs.to_vec()) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata( + &self, + input_meta: &[PartitionMetadata], + ) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + &self.name + } + } + + /// Tests that tasks requesting CPU compute are fusable. + #[test] + fn both_cpu_compute_fusable() -> DaftResult<()> { + let cpu_op1 = Arc::new(MockResourceOp::new("op1", Some(1.0), None, None)); + let cpu_op2 = Arc::new(MockResourceOp::new("op2", Some(2.0), None, None)); + + // Smaller CPU request first, larger second. + let mut builder = FusedOpBuilder::new(cpu_op1.clone()); + assert!(builder.can_add_op(cpu_op2.as_ref())); + builder.add_op(cpu_op2.clone()); + let fused = builder.build(); + // CPU resource request should be max. + assert_matches!(fused.resource_request().num_cpus, Some(2.0)); + assert_eq!(fused.name(), "op1-op2"); + + // Larger CPU request first, smaller second. + let mut builder = FusedOpBuilder::new(cpu_op2.clone()); + assert!(builder.can_add_op(cpu_op1.as_ref())); + builder.add_op(cpu_op1.clone()); + let fused = builder.build(); + // CPU resource request should be max. + assert_matches!(fused.resource_request().num_cpus, Some(2.0)); + assert_eq!(fused.name(), "op2-op1"); + + Ok(()) + } + + /// Tests that tasks requesting GPU compute are fusable. + #[test] + fn both_gpu_compute_fusable() -> DaftResult<()> { + let gpu_op1 = Arc::new(MockResourceOp::new("op1", None, Some(1.0), None)); + let gpu_op2 = Arc::new(MockResourceOp::new("op2", None, Some(2.0), None)); + + // Smaller GPU request first, larger second. + let mut builder = FusedOpBuilder::new(gpu_op1.clone()); + assert!(builder.can_add_op(gpu_op2.as_ref())); + builder.add_op(gpu_op2.clone()); + let fused = builder.build(); + // GPU resource request should be max. + assert_matches!(fused.resource_request().num_gpus, Some(2.0)); + assert_eq!(fused.name(), "op1-op2"); + + // Larger GPU request first, smaller second. + let mut builder = FusedOpBuilder::new(gpu_op2.clone()); + assert!(builder.can_add_op(gpu_op1.as_ref())); + builder.add_op(gpu_op1.clone()); + let fused = builder.build(); + // GPU resource request should be max. + assert_matches!(fused.resource_request().num_gpus, Some(2.0)); + assert_eq!(fused.name(), "op2-op1"); + + Ok(()) + } + + /// Tests resource request merging, where a max should be taken across each of the resource dimensions. + #[test] + fn resource_request_merging() -> DaftResult<()> { + let op1 = Arc::new(MockResourceOp::new("op1", Some(1.0), None, Some(1024))); + let op2 = Arc::new(MockResourceOp::new("op2", Some(3.0), None, Some(512))); + let op3 = Arc::new(MockResourceOp::new("op3", Some(2.0), None, None)); + + let mut builder = FusedOpBuilder::new(op1.clone()); + assert!(builder.can_add_op(op2.as_ref())); + builder.add_op(op2.clone()); + assert!(builder.can_add_op(op3.as_ref())); + builder.add_op(op3.clone()); + let fused = builder.build(); + // Resource requests should contain max across all task ops. + assert_eq!( + fused.resource_request(), + &ResourceRequest::new_internal(Some(3.0), None, Some(1024)) + ); + assert_eq!(fused.name(), "op1-op2-op3"); + + Ok(()) + } + + /// Tests that input metadata supplies memory bytes request. + #[test] + fn input_metadata_memory_bytes() -> DaftResult<()> { + let op1 = Arc::new(MockResourceOp::new("op1", None, None, None)); + let op2 = Arc::new(MockResourceOp::new("op2", None, None, None)); + let input_metadata = vec![PartitionMetadata::new(None, Some(1024))]; + + let mut builder = FusedOpBuilder::new(op1.clone()); + assert!(builder.can_add_op(op2.as_ref())); + builder.add_op(op2.clone()); + let fused = builder.build(); + // Memory bytes request should be equal to size given in input metadata. + assert_matches!( + fused + .resource_request_with_input_metadata(&input_metadata) + .memory_bytes, + Some(1024), + ); + assert_eq!(fused.name(), "op1-op2"); + + Ok(()) + } + + /// Tests that an explicit memory bytes request overrides input metadata. + #[test] + fn explicit_memory_bytes_overrides_input_metadata() -> DaftResult<()> { + let op1 = Arc::new(MockResourceOp::new("op1", None, None, None)); + let op2 = Arc::new(MockResourceOp::new("op2", None, None, Some(2048))); + let op3 = Arc::new(MockResourceOp::new("op3", None, None, None)); + let input_metadata = vec![PartitionMetadata::new(None, Some(1024))]; + + let mut builder = FusedOpBuilder::new(op1.clone()); + assert!(builder.can_add_op(op2.as_ref())); + builder.add_op(op2.clone()); + assert!(builder.can_add_op(op3.as_ref())); + builder.add_op(op3.clone()); + let fused = builder.build(); + // Explicit memory bytes request should override size given in input metadata. + assert_matches!( + fused + .resource_request_with_input_metadata(&input_metadata) + .memory_bytes, + Some(2048), + ); + assert_eq!(fused.name(), "op1-op2-op3"); + + Ok(()) + } + + /// Tests that CPU and GPU task ops aren't fused. + #[test] + fn cpu_and_gpu_task_not_fused() -> DaftResult<()> { + // CPU -> GPU. + let cpu_op = Arc::new(MockResourceOp::new("cpu_op", Some(1.0), None, None)); + let gpu_op = Arc::new(MockResourceOp::new("gpu_op", None, Some(1.0), None)); + let builder = FusedOpBuilder::new(cpu_op.clone()); + assert!(!builder.can_add_op(gpu_op.as_ref())); + + // GPU -> CPU. + let builder = FusedOpBuilder::new(gpu_op); + assert!(!builder.can_add_op(cpu_op.as_ref())); + Ok(()) + } + + /// Tests that CPU and GPU task ops aren't fused, even if separated by several non-conflicting tasks. + #[test] + fn cpu_and_gpu_task_not_fused_long_chain() -> DaftResult<()> { + let cpu_op = Arc::new(MockResourceOp::new("cpu_op", Some(1.0), None, None)); + let light_op = Arc::new(MockResourceOp::new("light_op", None, None, None)); + let gpu_op = Arc::new(MockResourceOp::new("gpu_op", None, Some(1.0), None)); + + // CPU op as source, lightweight op as intermediate, GPU op as attempted sink op. + let mut builder = FusedOpBuilder::new(cpu_op.clone()); + assert!(builder.can_add_op(light_op.as_ref())); + builder.add_op(light_op.clone()); + assert!(!builder.can_add_op(gpu_op.as_ref())); + assert_eq!(builder.build().name(), "cpu_op-light_op"); + + // Lightweight op as source, CPU as intermediate, GPU op as attempted sink op. + let mut builder = FusedOpBuilder::new(light_op.clone()); + assert!(builder.can_add_op(cpu_op.as_ref())); + builder.add_op(cpu_op.clone()); + assert!(!builder.can_add_op(gpu_op.as_ref())); + assert_eq!(builder.build().name(), "light_op-cpu_op"); + + // GPU op as source, lightweight op as intermediate, CPU op as attempted sink op. + let mut builder = FusedOpBuilder::new(gpu_op.clone()); + assert!(builder.can_add_op(light_op.as_ref())); + builder.add_op(light_op.clone()); + assert!(!builder.can_add_op(cpu_op.as_ref())); + assert_eq!(builder.build().name(), "gpu_op-light_op"); + + // Lightweight op as source, GPU as intermediate, CPU op as attempted sink op. + let mut builder = FusedOpBuilder::new(light_op.clone()); + assert!(builder.can_add_op(gpu_op.as_ref())); + builder.add_op(gpu_op); + assert!(!builder.can_add_op(cpu_op.as_ref())); + assert_eq!(builder.build().name(), "light_op-gpu_op"); + + Ok(()) + } + + #[derive(Debug)] + struct MockInputOutputOp { + num_inputs: usize, + num_outputs: usize, + resource_request: ResourceRequest, + name: String, + } + + impl MockInputOutputOp { + fn new(name: impl Into, num_inputs: usize, num_outputs: usize) -> Self { + Self { + num_inputs, + num_outputs, + resource_request: Default::default(), + name: name.into(), + } + } + } + + impl PartitionTaskOp for MockInputOutputOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + Ok(inputs.to_vec()) + } + + fn num_inputs(&self) -> usize { + self.num_inputs + } + + fn num_outputs(&self) -> usize { + self.num_outputs + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata( + &self, + input_meta: &[crate::partition::partition_ref::PartitionMetadata], + ) -> crate::partition::partition_ref::PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + &self.name + } + } + + /// Tests that the number of inputs is tied to first task op in fused chain. + #[test] + fn num_inputs_first_op() -> DaftResult<()> { + let op1 = Arc::new(MockInputOutputOp::new("op1", 2, 1)); + let op2 = Arc::new(MockInputOutputOp::new("op2", 1, 1)); + let op3 = Arc::new(MockInputOutputOp::new("op3", 3, 1)); + + let mut builder = FusedOpBuilder::new(op1.clone()); + assert!(builder.can_add_op(op2.as_ref())); + builder.add_op(op2.clone()); + assert!(builder.can_add_op(op3.as_ref())); + builder.add_op(op3.clone()); + let fused = builder.build(); + // Number of inputs of fused op should be equal to number of inputs for first op in chain. + assert_eq!(fused.num_inputs(), op1.num_inputs()); + assert_eq!(fused.name(), "op1-op2-op3"); + + Ok(()) + } + + /// Tests that the number of outputs is tied to last task op in fused chain. + #[test] + fn num_outputs_last_op() -> DaftResult<()> { + let op1 = Arc::new(MockInputOutputOp::new("op1", 1, 3)); + let op2 = Arc::new(MockInputOutputOp::new("op2", 1, 1)); + let op3 = Arc::new(MockInputOutputOp::new("op3", 1, 2)); + + let mut builder = FusedOpBuilder::new(op1.clone()); + assert!(builder.can_add_op(op2.as_ref())); + builder.add_op(op2.clone()); + assert!(builder.can_add_op(op3.as_ref())); + builder.add_op(op3.clone()); + let fused = builder.build(); + // Number of outputs of fused op should be equal to number of inputs for last op in chain. + assert_eq!(fused.num_inputs(), op3.num_inputs()); + assert_eq!(fused.name(), "op1-op2-op3"); + + Ok(()) + } + + #[derive(Debug)] + struct MockMutableOp { + num_with_input_metadatas: AtomicUsize, + num_with_output_metadatas: AtomicUsize, + resource_request: ResourceRequest, + name: String, + } + + impl MockMutableOp { + fn new(name: impl Into) -> Self { + Self { + num_with_input_metadatas: AtomicUsize::new(0), + num_with_output_metadatas: AtomicUsize::new(0), + resource_request: Default::default(), + name: name.into(), + } + } + } + + impl PartitionTaskOp for MockMutableOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + Ok(inputs.to_vec()) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata( + &self, + input_meta: &[crate::partition::partition_ref::PartitionMetadata], + ) -> crate::partition::partition_ref::PartitionMetadata { + todo!() + } + + fn with_input_metadata(&self, _: &[PartitionMetadata]) { + self.num_with_input_metadatas.fetch_add(1, Ordering::SeqCst); + } + + fn with_previous_output_metadata(&self, _: &[PartitionMetadata]) { + self.num_with_output_metadatas + .fetch_add(1, Ordering::SeqCst); + } + + fn name(&self) -> &str { + &self.name + } + } + + /// Tests that the with_input_metadata() is propagated to first task op in fused chain. + #[test] + fn with_input_metadata_first_op() -> DaftResult<()> { + let op1 = Arc::new(MockMutableOp::new("op1")); + let op2 = Arc::new(MockMutableOp::new("op2")); + let op3 = Arc::new(MockMutableOp::new("op3")); + let input_metadata = vec![PartitionMetadata::new(None, Some(1024))]; + + let mut builder = FusedOpBuilder::new(op1.clone()); + assert!(builder.can_add_op(op2.as_ref())); + builder.add_op(op2.clone()); + assert!(builder.can_add_op(op3.as_ref())); + builder.add_op(op3.clone()); + let fused = builder.build(); + // with_input_metadata() should propagate to ONLY the first task op in the fused chain. + fused.with_input_metadata(&input_metadata); + assert_eq!(op1.num_with_input_metadatas.load(Ordering::SeqCst), 1); + assert_eq!(op2.num_with_input_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(op3.num_with_input_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(op1.num_with_output_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(op2.num_with_output_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(op3.num_with_output_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(fused.name(), "op1-op2-op3"); + + Ok(()) + } + + /// Tests that the with_previous_output_metadata() is propagated to last task op in fused chain. + #[test] + fn with_previous_output_metadata_last_op() -> DaftResult<()> { + let op1 = Arc::new(MockMutableOp::new("op1")); + let op2 = Arc::new(MockMutableOp::new("op2")); + let op3 = Arc::new(MockMutableOp::new("op3")); + let input_metadata = vec![PartitionMetadata::new(None, Some(1024))]; + + let mut builder = FusedOpBuilder::new(op1.clone()); + assert!(builder.can_add_op(op2.as_ref())); + builder.add_op(op2.clone()); + assert!(builder.can_add_op(op3.as_ref())); + builder.add_op(op3.clone()); + let fused = builder.build(); + // with_previous_output_metadata() should propagate to ONLY the last task op in the fused chain. + fused.with_previous_output_metadata(&input_metadata); + assert_eq!(op1.num_with_output_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(op2.num_with_output_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(op3.num_with_output_metadatas.load(Ordering::SeqCst), 1); + assert_eq!(op1.num_with_input_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(op2.num_with_input_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(op3.num_with_input_metadatas.load(Ordering::SeqCst), 0); + assert_eq!(fused.name(), "op1-op2-op3"); + + Ok(()) + } +} diff --git a/src/daft-execution/src/ops/join.rs b/src/daft-execution/src/ops/join.rs new file mode 100644 index 0000000000..6474ece3e3 --- /dev/null +++ b/src/daft-execution/src/ops/join.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_core::JoinType; +use daft_dsl::ExprRef; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +/// Hash join task op. +#[derive(Debug)] +pub struct HashJoinOp { + left_on: Vec, + right_on: Vec, + join_type: JoinType, + resource_request: ResourceRequest, +} + +impl HashJoinOp { + pub fn new(left_on: Vec, right_on: Vec, join_type: JoinType) -> Self { + Self { + left_on, + right_on, + join_type, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for HashJoinOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert_eq!(inputs.len(), 2); + let mut input_iter = inputs.iter(); + let left = input_iter.next().unwrap(); + let right = input_iter.next().unwrap(); + let out = left.hash_join( + right.as_ref(), + &self.left_on, + &self.right_on, + self.join_type, + )?; + Ok(vec![Arc::new(out)]) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "HashJoinOp" + } +} diff --git a/src/daft-execution/src/ops/limit.rs b/src/daft-execution/src/ops/limit.rs new file mode 100644 index 0000000000..f790aefd85 --- /dev/null +++ b/src/daft-execution/src/ops/limit.rs @@ -0,0 +1,53 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +/// Limit task op. +#[derive(Debug)] +pub struct LimitOp { + limit: usize, + resource_request: ResourceRequest, +} + +impl LimitOp { + pub fn new(limit: usize) -> Self { + Self { + limit, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for LimitOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert_eq!(inputs.len(), 1); + let input = inputs.iter().next().unwrap(); + let out = input.head(self.limit)?; + Ok(vec![Arc::new(out)]) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata( + &self, + input_meta: &[PartitionMetadata], + ) -> PartitionMetadata { + assert_eq!(input_meta.len(), 1); + let input_meta = &input_meta[0]; + input_meta.with_num_rows(Some(self.limit)) + } + + fn name(&self) -> &str { + "LimitOp" + } +} diff --git a/src/daft-execution/src/ops/mod.rs b/src/daft-execution/src/ops/mod.rs new file mode 100644 index 0000000000..9d589b2047 --- /dev/null +++ b/src/daft-execution/src/ops/mod.rs @@ -0,0 +1,71 @@ +pub mod filter; +mod fused; +pub mod join; +pub mod limit; +pub mod monotonically_increasing_id; +pub mod project; +pub mod scan; +pub mod shuffle; +pub mod sort; + +pub use fused::FusedOpBuilder; + +use std::sync::Arc; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; + +use crate::partition::partition_ref::PartitionMetadata; + +/// Local task operator that takes one or more inputs (either scan tasks or micropartitions) and produces +/// one or more outputs (micropartitions). These are the core compute kernels of the execution model, designed +/// to execute locally over a single partition from each upstream source (single input for unary ops, two inputs for +/// binary ops, n inputs for fan-in ops). +pub trait PartitionTaskOp: std::fmt::Debug + Send + Sync { + // The type of inputs for execution. Can be either ScanTasks or MicroPartitions. + type Input; + + /// Execute the underlying op on the provided inputs, producing output micropartitions. + fn execute(&self, inputs: &[Arc]) -> DaftResult>>; + + /// Number of outputs produced by this op. Defaults to 1. + fn num_outputs(&self) -> usize { + 1 + } + + /// Number of inputs that this op takes. Defaults to 1. + fn num_inputs(&self) -> usize { + 1 + } + + /// Resource request for a task executing this op; this resource request isn't bound to any input metadata. + fn resource_request(&self) -> &ResourceRequest; + + /// Resource request for a task executing this op, bound to the metadata for the input that will be provided to + /// this task. + fn resource_request_with_input_metadata( + &self, + input_meta: &[PartitionMetadata], + ) -> ResourceRequest { + self.resource_request() + .or_memory_bytes(input_meta.iter().map(|m| m.size_bytes).sum()) + } + + /// The partially-specified metadata derived from the input metadata. + fn partial_metadata_from_input_metadata( + &self, + input_meta: &[PartitionMetadata], + ) -> PartitionMetadata; + + /// Provides the input metadata for the next task; this allows the task op to internally maintain stateful + /// configuration for the next task to be submitted (e.g. the partition number for montonically increasing ID). + fn with_input_metadata(&self, _: &[PartitionMetadata]) {} + + /// Provides the output metadata of the previous task executed for this op; this allows the task op to internally + /// maintain stateful configuration for the next task to be submitted (e.g. the total number of rows for the row number op). + fn with_previous_output_metadata(&self, _: &[PartitionMetadata]) {} + + /// A human-readable name for the task op. + fn name(&self) -> &str; +} diff --git a/src/daft-execution/src/ops/monotonically_increasing_id.rs b/src/daft-execution/src/ops/monotonically_increasing_id.rs new file mode 100644 index 0000000000..9795333c64 --- /dev/null +++ b/src/daft-execution/src/ops/monotonically_increasing_id.rs @@ -0,0 +1,60 @@ +use std::sync::{ + atomic::{AtomicI64, Ordering}, + Arc, +}; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +/// Monotonically increasing ID task op. +#[derive(Debug)] +pub struct MonotonicallyIncreasingIdOp { + column_name: String, + num_partitions: AtomicI64, + resource_request: ResourceRequest, +} + +impl MonotonicallyIncreasingIdOp { + pub fn new(column_name: String) -> Self { + Self { + column_name, + num_partitions: AtomicI64::new(-1), + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for MonotonicallyIncreasingIdOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert_eq!(inputs.len(), 1); + let input = inputs.iter().next().unwrap(); + let out = input.add_monotonically_increasing_id( + self.num_partitions.load(Ordering::SeqCst) as u64, + &self.column_name, + )?; + Ok(vec![Arc::new(out)]) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn with_input_metadata(&self, _: &[PartitionMetadata]) { + self.num_partitions.fetch_add(1, Ordering::SeqCst); + } + + fn name(&self) -> &str { + "MonotonicallyIncreasingIdOp" + } +} diff --git a/src/daft-execution/src/ops/project.rs b/src/daft-execution/src/ops/project.rs new file mode 100644 index 0000000000..fe9500913e --- /dev/null +++ b/src/daft-execution/src/ops/project.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_dsl::ExprRef; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +/// Project task op, applying a projection to its input. +#[derive(Debug)] +pub struct ProjectOp { + projection: Vec, + resource_request: ResourceRequest, +} + +impl ProjectOp { + pub fn new(projection: Vec, resource_request: ResourceRequest) -> Self { + Self { + projection, + resource_request, + } + } +} + +impl PartitionTaskOp for ProjectOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert_eq!(inputs.len(), 1); + let input = inputs.iter().next().unwrap(); + let out = input.eval_expression_list(self.projection.as_slice())?; + Ok(vec![Arc::new(out)]) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "ProjectOp" + } +} diff --git a/src/daft-execution/src/ops/scan.rs b/src/daft-execution/src/ops/scan.rs new file mode 100644 index 0000000000..e8ae3ad54d --- /dev/null +++ b/src/daft-execution/src/ops/scan.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_io::IOStatsContext; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; +use daft_scan::ScanTask; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +/// Scan task op, executing provided scan tasks in order to produce eagerly or lazily materialized MicroPartitions. +#[derive(Debug)] +pub struct ScanOp { + resource_request: ResourceRequest, +} + +impl ScanOp { + pub fn new() -> Self { + Self { + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for ScanOp { + type Input = ScanTask; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert!(inputs.len() == 1); + let scan_task = inputs.iter().next().unwrap(); + let io_stats = IOStatsContext::new(format!( + "MicroPartition::from_scan_task for {:?}", + scan_task.sources + )); + let out = MicroPartition::from_scan_task(scan_task.clone(), io_stats)?; + Ok(vec![Arc::new(out)]) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "ScanOp" + } +} diff --git a/src/daft-execution/src/ops/shuffle.rs b/src/daft-execution/src/ops/shuffle.rs new file mode 100644 index 0000000000..a041d95a3b --- /dev/null +++ b/src/daft-execution/src/ops/shuffle.rs @@ -0,0 +1,160 @@ +use std::sync::{ + atomic::{AtomicI64, Ordering}, + Arc, +}; + +use common_error::DaftResult; +use daft_dsl::ExprRef; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +fn get_random_u64() -> u64 { + rand::random() +} + +/// Fanout hash task op. +#[derive(Debug)] +pub struct FanoutHashOp { + num_outputs: usize, + partition_by: Vec, + resource_request: ResourceRequest, +} + +impl FanoutHashOp { + pub fn new(num_outputs: usize, partition_by: Vec) -> Self { + Self { + num_outputs, + partition_by, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for FanoutHashOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert!(inputs.len() == 1); + let inputs = inputs.iter().next().unwrap(); + if self.num_outputs == 1 { + return Ok(vec![inputs.clone()]); + } + let partitioned = inputs.partition_by_hash(&self.partition_by, self.num_outputs)?; + Ok(partitioned.into_iter().map(Arc::new).collect::>()) + } + + fn num_outputs(&self) -> usize { + self.num_outputs + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "FanoutHashOp" + } +} + +/// Fanout random task op. +#[derive(Debug)] +pub struct FanoutRandomOp { + num_outputs: usize, + partition_idx: AtomicI64, + resource_request: ResourceRequest, +} + +impl FanoutRandomOp { + pub fn new(num_outputs: usize) -> Self { + Self { + num_outputs, + partition_idx: AtomicI64::new(-1), + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for FanoutRandomOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert!(inputs.len() == 1); + let inputs = inputs.iter().next().unwrap(); + // Use partition index as seed. + let seed = self.partition_idx.load(Ordering::SeqCst); + let partitioned = inputs.partition_by_random(self.num_outputs, seed as u64)?; + Ok(partitioned.into_iter().map(Arc::new).collect::>()) + } + + fn num_outputs(&self) -> usize { + self.num_outputs + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn with_input_metadata(&self, _: &[PartitionMetadata]) { + self.partition_idx.fetch_add(1, Ordering::SeqCst); + } + + fn name(&self) -> &str { + "FanoutHashOp" + } +} + +/// Reduce merge task op. +#[derive(Debug)] +pub struct ReduceMergeOp { + num_inputs: usize, + resource_request: ResourceRequest, +} + +impl ReduceMergeOp { + pub fn new(num_inputs: usize) -> Self { + Self { + num_inputs, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for ReduceMergeOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + let inputs = inputs + .iter() + .map(|input| input.as_ref()) + .collect::>(); + Ok(vec![Arc::new(MicroPartition::concat(inputs.as_slice())?)]) + } + + fn num_inputs(&self) -> usize { + self.num_inputs + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "ReduceMergeOp" + } +} diff --git a/src/daft-execution/src/ops/sort.rs b/src/daft-execution/src/ops/sort.rs new file mode 100644 index 0000000000..bca805e6a7 --- /dev/null +++ b/src/daft-execution/src/ops/sort.rs @@ -0,0 +1,251 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_dsl::{col, ExprRef}; +use daft_io::IOStatsContext; +use daft_micropartition::MicroPartition; +use daft_plan::ResourceRequest; + +use crate::partition::partition_ref::PartitionMetadata; + +use super::PartitionTaskOp; + +/// Boundary sampling task op. +#[derive(Debug)] +pub struct BoundarySamplingOp { + size: usize, + sort_by: Vec, + resource_request: ResourceRequest, +} + +impl BoundarySamplingOp { + pub fn new(size: usize, sort_by: Vec) -> Self { + Self { + size, + sort_by, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for BoundarySamplingOp { + type Input = MicroPartition; + + fn execute(&self, mut inputs: &[Arc]) -> DaftResult>> { + assert_eq!(inputs.len(), 1); + let input = inputs.iter().next().unwrap(); + let predicate = self + .sort_by + .iter() + .map(|e| col(e.name()).not_null()) + .collect::>(); + let out = input + .sample_by_size(self.size, false, None)? + .eval_expression_list(self.sort_by.as_slice())? + .filter(predicate.as_slice())?; + Ok(vec![Arc::new(out)]) + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "BoundarySamplingOp" + } +} + +/// Samples to quantiles task op. +#[derive(Debug)] +pub struct SamplesToQuantilesOp { + num_quantiles: usize, + sort_by: Vec, + descending: Vec, + num_inputs: usize, + resource_request: ResourceRequest, +} + +impl SamplesToQuantilesOp { + pub fn new( + num_quantiles: usize, + sort_by: Vec, + descending: Vec, + num_inputs: usize, + ) -> Self { + Self { + num_quantiles, + sort_by, + descending, + num_inputs, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for SamplesToQuantilesOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + let inputs = inputs + .iter() + .map(|input| input.as_ref()) + .collect::>(); + let input = MicroPartition::concat(inputs.as_slice())?; + let sort_by = self + .sort_by + .iter() + .map(|e| col(e.name())) + .collect::>(); + let merge_sorted = input.sort(sort_by.as_slice(), self.descending.as_slice())?; + let out = merge_sorted.quantiles(self.num_quantiles)?; + Ok(vec![Arc::new(out)]) + } + + fn num_inputs(&self) -> usize { + self.num_inputs + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "SamplesToQuantilesOp" + } +} + +/// Fanout range task op. +#[derive(Debug)] +pub struct FanoutRangeOp { + num_outputs: usize, + sort_by: Vec, + descending: Vec, + resource_request: ResourceRequest, +} + +impl FanoutRangeOp { + pub fn new(num_outputs: usize, sort_by: Vec, descending: Vec) -> Self { + Self { + num_outputs, + sort_by, + descending, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for FanoutRangeOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + assert!(inputs.len() == 2); + let mut input_iter = inputs.iter(); + let boundaries = input_iter.next().unwrap(); + let inputs = input_iter.next().unwrap(); + if self.num_outputs == 1 { + return Ok(vec![inputs.clone()]); + } + let io_stats = IOStatsContext::new("MicroPartition::to_table"); + let boundaries = boundaries.concat_or_get(io_stats)?; + let boundaries = match &boundaries.as_ref()[..] { + [table] => table, + _ => unreachable!(), + }; + log::warn!("Boundaries num rows: {}", boundaries.len()); + let partitioned = inputs.partition_by_range(&self.sort_by, boundaries, &self.descending)?; + log::warn!("Partitioned num MicroPartitions: {}", partitioned.len()); + assert!(!partitioned.is_empty()); + let schema = partitioned[0].schema(); + let mut partitioned = partitioned.into_iter().map(Arc::new).collect::>(); + if partitioned.len() != self.num_outputs { + partitioned.extend( + std::iter::repeat(Arc::new(MicroPartition::empty(Some(schema)))) + .take(self.num_outputs - partitioned.len()), + ); + } + log::warn!( + "Partitioned num MicroPartitions after padding: {}", + partitioned.len() + ); + Ok(partitioned) + } + + fn num_inputs(&self) -> usize { + 2 + } + + fn num_outputs(&self) -> usize { + self.num_outputs + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "FanoutRangeOp" + } +} + +/// Sorted merge task op. +#[derive(Debug)] +pub struct SortedMergeOp { + num_inputs: usize, + sort_by: Vec, + descending: Vec, + resource_request: ResourceRequest, +} + +impl SortedMergeOp { + pub fn new(num_inputs: usize, sort_by: Vec, descending: Vec) -> Self { + Self { + num_inputs, + sort_by, + descending, + resource_request: ResourceRequest::default_cpu(), + } + } +} + +impl PartitionTaskOp for SortedMergeOp { + type Input = MicroPartition; + + fn execute(&self, inputs: &[Arc]) -> DaftResult>> { + let inputs = inputs + .iter() + .map(|input| input.as_ref()) + .collect::>(); + let concated = MicroPartition::concat(inputs.as_slice())?; + concated + .sort(&self.sort_by, &self.descending) + .map(|mp| vec![Arc::new(mp)]) + } + + fn num_inputs(&self) -> usize { + self.num_inputs + } + + fn resource_request(&self) -> &ResourceRequest { + &self.resource_request + } + + fn partial_metadata_from_input_metadata(&self, _: &[PartitionMetadata]) -> PartitionMetadata { + todo!() + } + + fn name(&self) -> &str { + "FanoutRangeOp" + } +} diff --git a/src/daft-execution/src/partition/mod.rs b/src/daft-execution/src/partition/mod.rs new file mode 100644 index 0000000000..105a6fff09 --- /dev/null +++ b/src/daft-execution/src/partition/mod.rs @@ -0,0 +1,3 @@ +pub mod partition_ref; + +pub use partition_ref::PartitionRef; diff --git a/src/daft-execution/src/partition/partition_ref.rs b/src/daft-execution/src/partition/partition_ref.rs new file mode 100644 index 0000000000..a44de74708 --- /dev/null +++ b/src/daft-execution/src/partition/partition_ref.rs @@ -0,0 +1,68 @@ +use std::sync::Arc; + +use daft_micropartition::MicroPartition; + +/// A reference to a partition of the table. +/// +/// If using the local executor, this partition lives in local memory. +/// If using the Ray executor, this partition lives in the memory of some worker in the cluster. +pub trait PartitionRef: std::fmt::Debug + Clone + Send + 'static { + /// Get the metadata for this partition. + fn metadata(&self) -> PartitionMetadata; + + /// Materialize the partition that underlies the reference. + fn partition(&self) -> Arc; +} + +/// Metadata for a partition. +#[derive(Debug, Clone)] +pub struct PartitionMetadata { + // Number of rows in partition. + pub num_rows: Option, + // Size of partition in bytes. + pub size_bytes: Option, + // pub part_col_stats: PartitionColumnStats, + // pub execution_stats: ExecutionStats, +} + +impl PartitionMetadata { + pub fn new( + num_rows: Option, + size_bytes: Option, + // part_col_stats: PartitionColumnStats, + // execution_stats: ExecutionStats, + ) -> Self { + Self { + num_rows, + size_bytes, + // part_col_stats, + // execution_stats, + } + } + + pub fn with_num_rows(&self, num_rows: Option) -> Self { + Self { + num_rows, + size_bytes: self.size_bytes, + } + } + + pub fn with_size_bytes(&self, size_bytes: Option) -> Self { + Self { + num_rows: self.num_rows, + size_bytes, + } + } +} + +#[derive(Debug)] +pub struct PartitionColumnStats {} + +#[derive(Debug)] +pub struct ExecutionStats { + wall_time_s: f64, + cpu_time_s: f64, + max_rss_bytes: usize, + node_id: String, + partition_id: String, +} diff --git a/src/daft-micropartition/src/lib.rs b/src/daft-micropartition/src/lib.rs index d0d305d13b..1933ab3b78 100644 --- a/src/daft-micropartition/src/lib.rs +++ b/src/daft-micropartition/src/lib.rs @@ -6,6 +6,8 @@ use snafu::Snafu; mod micropartition; mod ops; +pub use micropartition::MicroPartition; + #[cfg(feature = "python")] pub mod python; #[cfg(feature = "python")] diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index d53c49ee47..330d2b809d 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -30,6 +30,7 @@ use daft_io::{IOClient, IOConfig, IOStatsContext, IOStatsRef}; use daft_stats::TableStatistics; use daft_stats::{PartitionSpec, TableMetadata}; +#[derive(Debug)] pub(crate) enum TableState { Unloaded(Arc), Loaded(Arc>), @@ -59,7 +60,9 @@ impl Display for TableState { } } } -pub(crate) struct MicroPartition { + +#[derive(Debug)] +pub struct MicroPartition { /// Schema of the MicroPartition /// /// This is technically redundant with the schema in `state`: @@ -617,10 +620,18 @@ impl MicroPartition { self.schema.names() } + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + pub fn len(&self) -> usize { self.metadata.length } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + pub fn size_bytes(&self) -> DaftResult> { let guard = self.state.lock().unwrap(); let size_bytes = if let TableState::Loaded(tables) = guard.deref() { @@ -658,7 +669,7 @@ impl MicroPartition { } } - pub(crate) fn concat_or_get(&self, io_stats: IOStatsRef) -> crate::Result>> { + pub fn concat_or_get(&self, io_stats: IOStatsRef) -> crate::Result>> { let tables = self.tables_or_read(io_stats)?; if tables.len() <= 1 { return Ok(tables); @@ -679,7 +690,7 @@ impl MicroPartition { } } - pub(crate) fn add_monotonically_increasing_id( + pub fn add_monotonically_increasing_id( &self, partition_num: u64, column_name: &str, diff --git a/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs b/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs index 35c98a8013..26b64d5551 100644 --- a/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs @@ -122,7 +122,7 @@ impl PushDownProjection { let new_plan: LogicalPlan = Project::try_new( upstream_projection.input.clone(), merged_projection, - ResourceRequest::max(&[ + ResourceRequest::max_all(&[ &upstream_projection.resource_request, &projection.resource_request, ]), diff --git a/src/daft-plan/src/resource_request.rs b/src/daft-plan/src/resource_request.rs index 3dc402df08..9994230617 100644 --- a/src/daft-plan/src/resource_request.rs +++ b/src/daft-plan/src/resource_request.rs @@ -31,6 +31,52 @@ impl ResourceRequest { } } + pub fn default_cpu() -> Self { + Self::new_internal(Some(1.0), None, None) + } + + pub fn with_num_cpus(&self, num_cpus: Option) -> Self { + Self { + num_cpus, + ..self.clone() + } + } + + pub fn or_num_cpus(&self, num_cpus: Option) -> Self { + Self { + num_cpus: self.num_cpus.or(num_cpus), + ..self.clone() + } + } + + pub fn with_num_gpus(&self, num_gpus: Option) -> Self { + Self { + num_gpus, + ..self.clone() + } + } + + pub fn or_num_gpus(&self, num_gpus: Option) -> Self { + Self { + num_gpus: self.num_gpus.or(num_gpus), + ..self.clone() + } + } + + pub fn with_memory_bytes(&self, memory_bytes: Option) -> Self { + Self { + memory_bytes, + ..self.clone() + } + } + + pub fn or_memory_bytes(&self, memory_bytes: Option) -> Self { + Self { + memory_bytes: self.memory_bytes.or(memory_bytes), + ..self.clone() + } + } + pub fn has_any(&self) -> bool { self.num_cpus.is_some() || self.num_gpus.is_some() || self.memory_bytes.is_some() } @@ -49,13 +95,37 @@ impl ResourceRequest { requests } - pub fn max(resource_requests: &[&Self]) -> Self { - resource_requests.iter().fold(Default::default(), |acc, e| { - let max_num_cpus = lift(float_max, acc.num_cpus, e.num_cpus); - let max_num_gpus = lift(float_max, acc.num_gpus, e.num_gpus); - let max_memory_bytes = lift(std::cmp::max, acc.memory_bytes, e.memory_bytes); - Self::new_internal(max_num_cpus, max_num_gpus, max_memory_bytes) - }) + /// Checks whether other is pipeline-compatible with self, i.e the resource requests are homogeneous enough that + /// we don't want to pipeline tasks that have these resource requests with each other. + /// + /// Currently, this returns true unless one resource request has a non-zero CPU request and the other task has a + /// non-zero GPU request. + pub fn is_pipeline_compatible_with(&self, other: &ResourceRequest) -> bool { + let self_num_cpus = self.num_cpus; + let self_num_gpus = self.num_gpus; + let other_num_cpus = other.num_cpus; + let other_num_gpus = other.num_gpus; + match (self_num_cpus, self_num_gpus, other_num_cpus, other_num_gpus) { + (_, Some(n_gpus), Some(n_cpus), _) | (Some(n_cpus), _, _, Some(n_gpus)) + if n_gpus > 0.0 && n_cpus > 0.0 => + { + false + } + (_, _, _, _) => true, + } + } + + pub fn max(&self, other: &ResourceRequest) -> Self { + let max_num_cpus = lift(float_max, self.num_cpus, other.num_cpus); + let max_num_gpus = lift(float_max, self.num_gpus, other.num_gpus); + let max_memory_bytes = lift(std::cmp::max, self.memory_bytes, other.memory_bytes); + Self::new_internal(max_num_cpus, max_num_gpus, max_memory_bytes) + } + + pub fn max_all(resource_requests: &[&Self]) -> Self { + resource_requests + .iter() + .fold(Default::default(), |acc, e| acc.max(e)) } } @@ -110,7 +180,7 @@ impl ResourceRequest { /// Take a field-wise max of the list of resource requests. #[staticmethod] pub fn max_resources(resource_requests: Vec) -> Self { - Self::max(&resource_requests.iter().collect::>()) + Self::max_all(&resource_requests.iter().collect::>()) } #[getter] diff --git a/src/lib.rs b/src/lib.rs index b3db2d092f..240459901b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,6 +61,7 @@ pub mod pylib { common_system_info::register_modules(_py, m)?; daft_core::register_modules(_py, m)?; daft_core::python::register_modules(_py, m)?; + daft_execution::register_modules(_py, m)?; daft_dsl::register_modules(_py, m)?; daft_table::register_modules(_py, m)?; daft_io::register_modules(_py, m)?;