From b15b0c7f7189e4ec395cb9d0491854ceae1bfda5 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Fri, 16 Aug 2024 15:54:10 -0700 Subject: [PATCH] [FEAT] Example Analyze for Local Execution Engine (#2648) image --- Cargo.lock | 18 +++ src/common/display/src/lib.rs | 1 + src/common/display/src/mermaid.rs | 27 +++- src/common/display/src/utils.rs | 18 +++ src/daft-io/src/stats.rs | 4 +- src/daft-local-execution/Cargo.toml | 4 +- .../src/intermediate_ops/intermediate_op.rs | 51 ++++++- src/daft-local-execution/src/lib.rs | 2 +- src/daft-local-execution/src/pipeline.rs | 18 ++- src/daft-local-execution/src/run.rs | 27 +++- src/daft-local-execution/src/runtime_stats.rs | 129 ++++++++++++++++++ .../src/sinks/blocking_sink.rs | 51 ++++++- .../src/sinks/hash_join.rs | 60 +++++++- src/daft-local-execution/src/sinks/sort.rs | 7 +- .../src/sinks/streaming_sink.rs | 47 ++++++- .../src/sources/in_memory.rs | 10 +- .../src/sources/scan_task.rs | 17 ++- .../src/sources/source.rs | 66 ++++++++- src/daft-parquet/src/read.rs | 1 + src/daft-parquet/src/stream_reader.rs | 76 ++++++++++- src/daft-plan/src/display.rs | 1 + 21 files changed, 597 insertions(+), 38 deletions(-) create mode 100644 src/common/display/src/utils.rs create mode 100644 src/daft-local-execution/src/runtime_stats.rs diff --git a/Cargo.lock b/Cargo.lock index ad1c058ab6..979d34b054 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,6 +112,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf7d0a018de4f6aa429b9d33d69edf69072b1c5b1cb8d3e4a5f7ef898fc3eb76" +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + [[package]] name = "arrow-array" version = "51.0.0" @@ -1822,6 +1828,7 @@ version = "0.3.0-dev0" dependencies = [ "async-stream", "async-trait", + "common-display", "common-error", "common-tracing", "daft-core", @@ -1838,6 +1845,7 @@ dependencies = [ "daft-table", "futures", "lazy_static", + "num-format", "pyo3", "snafu", "tokio", @@ -3412,6 +3420,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "num-format" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3" +dependencies = [ + "arrayvec", + "itoa", +] + [[package]] name = "num-integer" version = "0.1.46" diff --git a/src/common/display/src/lib.rs b/src/common/display/src/lib.rs index bc8e45d197..a65ca7e691 100644 --- a/src/common/display/src/lib.rs +++ b/src/common/display/src/lib.rs @@ -3,6 +3,7 @@ use mermaid::MermaidDisplayOptions; pub mod ascii; pub mod mermaid; pub mod tree; +pub mod utils; pub trait DisplayAs { fn display_as(&self, level: DisplayLevel) -> String; diff --git a/src/common/display/src/mermaid.rs b/src/common/display/src/mermaid.rs index 1fc96e1021..f78737f284 100644 --- a/src/common/display/src/mermaid.rs +++ b/src/common/display/src/mermaid.rs @@ -14,6 +14,8 @@ pub struct MermaidDisplayOptions { /// This is useful for large trees. /// In simple mode, the display string is just the node's name. pub simple: bool, + /// Display the root node at the bottom of the diagram or at the top + pub bottom_up: bool, /// subgraph_options is used to configure the subgraph. /// Since some common displays (jupyter) don't support multiple mermaid graphs in a single cell, we need to use subgraphs. /// The subgraph_options is used to both indicate that a subgraph should be used, and to configure the subgraph. @@ -38,8 +40,12 @@ impl MermaidDisplay for T { false => DisplayLevel::Default, }; - let mut visitor = - MermaidDisplayVisitor::new(&mut s, display_type, options.subgraph_options); + let mut visitor = MermaidDisplayVisitor::new( + &mut s, + display_type, + options.bottom_up, + options.subgraph_options, + ); let _ = visitor.fmt(self); s @@ -49,6 +55,8 @@ impl MermaidDisplay for T { pub struct MermaidDisplayVisitor<'a, W> { output: &'a mut W, t: DisplayLevel, + /// Should the root node be at the bottom or the top of the diagram + bottom_up: bool, /// each node should only appear once in the tree. /// the key is the node's `multiline_display` string, and the value is the node's id. /// This is necessary because the same kind of node can appear multiple times in the tree. (such as multiple filters) @@ -59,10 +67,16 @@ pub struct MermaidDisplayVisitor<'a, W> { } impl<'a, W> MermaidDisplayVisitor<'a, W> { - pub fn new(w: &'a mut W, t: DisplayLevel, subgraph_options: Option) -> Self { + pub fn new( + w: &'a mut W, + t: DisplayLevel, + bottom_up: bool, + subgraph_options: Option, + ) -> Self { Self { output: w, t, + bottom_up, nodes: IndexMap::new(), node_count: 0, subgraph_options, @@ -138,7 +152,12 @@ where writeln!(self.output, "end")?; } None => { - writeln!(self.output, "flowchart TD")?; + if self.bottom_up { + writeln!(self.output, "flowchart BT")?; + } else { + writeln!(self.output, "flowchart TD")?; + } + self.fmt_node(node)?; } } diff --git a/src/common/display/src/utils.rs b/src/common/display/src/utils.rs new file mode 100644 index 0000000000..082ec0a883 --- /dev/null +++ b/src/common/display/src/utils.rs @@ -0,0 +1,18 @@ +pub fn bytes_to_human_readable(byte_count: usize) -> String { + if byte_count == 0 { + return "0 B".to_string(); + } + + const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]; + let base = byte_count.ilog2() / 10; // log2(1024) = 10 + + let index = std::cmp::min(base, (UNITS.len() - 1) as u32); + let basis = 1usize << (10 * index); + let scaled_value = (byte_count as f64) / (basis as f64); + let unit = UNITS.get(index as usize).unwrap(); + if index == 0 { + format!("{byte_count} {unit}") + } else { + format!("{scaled_value:.2} {unit}") + } +} diff --git a/src/daft-io/src/stats.rs b/src/daft-io/src/stats.rs index f3eb85b5d7..0fc15be4d3 100644 --- a/src/daft-io/src/stats.rs +++ b/src/daft-io/src/stats.rs @@ -61,7 +61,7 @@ impl IOStatsContext { } #[inline] - pub(crate) fn mark_get_requests(&self, num_requests: usize) { + pub fn mark_get_requests(&self, num_requests: usize) { self.num_get_requests .fetch_add(num_requests, atomic::Ordering::Relaxed); } @@ -105,7 +105,7 @@ impl IOStatsContext { } #[inline] - pub(crate) fn mark_bytes_read(&self, bytes_read: usize) { + pub fn mark_bytes_read(&self, bytes_read: usize) { self.bytes_read .fetch_add(bytes_read, atomic::Ordering::Relaxed); } diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 509b73a0ee..211c0ac627 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -1,6 +1,7 @@ [dependencies] async-stream = {workspace = true} async-trait = {workspace = true} +common-display = {path = "../common/display", default-features = false} common-error = {path = "../common/error", default-features = false} common-tracing = {path = "../common/tracing", default-features = false} daft-core = {path = "../daft-core", default-features = false} @@ -17,13 +18,14 @@ daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} futures = {workspace = true} lazy_static = {workspace = true} +num-format = "0.4.4" pyo3 = {workspace = true, optional = true} snafu = {workspace = true} tokio = {workspace = true} tracing = {workspace = true} [features] -python = ["dep:pyo3", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python"] +python = ["dep:pyo3", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "common-display/python"] [package] edition = {workspace = true} diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index e15309a0b6..f924a72825 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_display::tree::TreeDisplay; use common_error::DaftResult; use daft_micropartition::MicroPartition; use tracing::{info_span, instrument}; @@ -12,20 +13,20 @@ use crate::{ SingleSender, }, pipeline::PipelineNode, + runtime_stats::{CountingSender, RuntimeStatsContext}, ExecutionRuntimeHandle, NUM_CPUS, }; use super::state::OperatorTaskState; pub trait IntermediateOperator: Send + Sync { fn execute(&self, input: &Arc) -> DaftResult>; - #[allow(dead_code)] - fn name(&self) -> &'static str; } pub(crate) struct IntermediateNode { intermediate_op: Arc, children: Vec>, + runtime_stats: Arc, } impl IntermediateNode { @@ -33,9 +34,19 @@ impl IntermediateNode { intermediate_op: Arc, children: Vec>, ) -> Self { - IntermediateNode { + let rts = RuntimeStatsContext::new(); + Self::new_with_runtime_stats(intermediate_op, children, rts) + } + + pub(crate) fn new_with_runtime_stats( + intermediate_op: Arc, + children: Vec>, + runtime_stats: Arc, + ) -> Self { + Self { intermediate_op, children, + runtime_stats, } } @@ -48,11 +59,14 @@ impl IntermediateNode { op: Arc, mut receiver: SingleReceiver, sender: SingleSender, + rt_context: Arc, ) -> DaftResult<()> { let mut state = OperatorTaskState::new(); let span = info_span!("IntermediateOp::execute"); + let sender = CountingSender::new(sender, rt_context.clone()); while let Some(morsel) = receiver.recv().await { - let result = span.in_scope(|| op.execute(&morsel))?; + rt_context.mark_rows_received(morsel.len() as u64); + let result = rt_context.in_span(&span, || op.execute(&morsel))?; state.add(result); if let Some(part) = state.try_clear() { let _ = sender.send(part?).await; @@ -78,6 +92,7 @@ impl IntermediateNode { self.intermediate_op.clone(), worker_receiver, destination_sender, + self.runtime_stats.clone(), )); worker_senders.push(worker_sender); } @@ -102,12 +117,37 @@ impl IntermediateNode { } } +impl TreeDisplay for IntermediateNode { + fn display_as(&self, level: common_display::DisplayLevel) -> String { + use std::fmt::Write; + let mut display = String::new(); + writeln!(display, "{}", self.intermediate_op.name()).unwrap(); + use common_display::DisplayLevel::*; + match level { + Compact => {} + _ => { + let rt_result = self.runtime_stats.result(); + rt_result.display(&mut display, true, true, true).unwrap(); + } + } + display + } + + fn get_children(&self) -> Vec<&dyn TreeDisplay> { + self.children.iter().map(|v| v.as_tree_display()).collect() + } +} + #[async_trait] impl PipelineNode for IntermediateNode { fn children(&self) -> Vec<&dyn PipelineNode> { self.children.iter().map(|v| v.as_ref()).collect() } + fn name(&self) -> &'static str { + self.intermediate_op.name() + } + async fn start( &mut self, mut destination: MultiSender, @@ -129,4 +169,7 @@ impl PipelineNode for IntermediateNode { runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders)); Ok(()) } + fn as_tree_display(&self) -> &dyn TreeDisplay { + self + } } diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index 445d8203f7..99958c3ecb 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -3,9 +3,9 @@ mod channel; mod intermediate_ops; mod pipeline; mod run; +mod runtime_stats; mod sinks; mod sources; - use common_error::{DaftError, DaftResult}; use lazy_static::lazy_static; pub use run::NativeExecutor; diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index d66e8b6421..9d5ad6143d 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -18,6 +18,7 @@ use crate::{ }; use async_trait::async_trait; +use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay}; use common_error::DaftResult; use daft_dsl::Expr; use daft_micropartition::MicroPartition; @@ -30,13 +31,28 @@ use daft_plan::populate_aggregation_stages; use crate::channel::MultiSender; #[async_trait] -pub trait PipelineNode: Sync + Send { +pub trait PipelineNode: Sync + Send + TreeDisplay { fn children(&self) -> Vec<&dyn PipelineNode>; + fn name(&self) -> &'static str; async fn start( &mut self, destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, ) -> DaftResult<()>; + + fn as_tree_display(&self) -> &dyn TreeDisplay; +} + +pub(crate) fn viz_pipeline(root: &dyn PipelineNode) -> String { + let mut output = String::new(); + let mut visitor = MermaidDisplayVisitor::new( + &mut output, + common_display::DisplayLevel::Default, + true, + Default::default(), + ); + visitor.fmt(root.as_tree_display()).unwrap(); + output } pub fn physical_plan_to_pipeline( diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 1f8248c9ec..36050f9165 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -1,9 +1,12 @@ use std::{ collections::HashMap, + fs::File, + io::Write, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, + time::{SystemTime, UNIX_EPOCH}, }; use common_error::DaftResult; @@ -19,7 +22,9 @@ use { }; use crate::{ - channel::create_channel, pipeline::physical_plan_to_pipeline, Error, ExecutionRuntimeHandle, + channel::create_channel, + pipeline::{physical_plan_to_pipeline, viz_pipeline}, + Error, ExecutionRuntimeHandle, }; #[cfg(feature = "python")] @@ -88,6 +93,17 @@ impl NativeExecutor { } } +fn should_enable_explain_analyze() -> bool { + let explain_var_name = "DAFT_DEV_ENABLE_EXPLAIN_ANALYZE"; + if let Ok(val) = std::env::var(explain_var_name) + && matches!(val.trim().to_lowercase().as_str(), "1" | "true") + { + true + } else { + false + } +} + pub fn run_local( physical_plan: &LocalPhysicalPlan, psets: HashMap>>, @@ -128,6 +144,15 @@ pub fn run_local( _ => {} } } + if should_enable_explain_analyze() { + let curr_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis(); + let file_name = format!("explain-analyze-{}-mermaid.md", curr_ms); + let mut file = File::create(file_name)?; + writeln!(file, "```mermaid\n{}\n```", viz_pipeline(pipeline.as_ref()))?; + } Ok(result.into_iter()) }); Ok(Box::new(res?)) diff --git a/src/daft-local-execution/src/runtime_stats.rs b/src/daft-local-execution/src/runtime_stats.rs new file mode 100644 index 0000000000..e3e80886d2 --- /dev/null +++ b/src/daft-local-execution/src/runtime_stats.rs @@ -0,0 +1,129 @@ +use core::fmt; +use std::{ + fmt::Write, + sync::{atomic::AtomicU64, Arc}, + time::Instant, +}; + +use daft_micropartition::MicroPartition; +use tokio::sync::mpsc::error::SendError; + +use crate::channel::SingleSender; + +#[derive(Default)] +pub(crate) struct RuntimeStatsContext { + rows_received: AtomicU64, + rows_emitted: AtomicU64, + cpu_us: AtomicU64, +} + +#[derive(Debug)] +pub(crate) struct RuntimeStats { + pub rows_received: u64, + pub rows_emitted: u64, + pub cpu_us: u64, +} + +impl RuntimeStats { + pub(crate) fn display( + &self, + w: &mut W, + received: bool, + emitted: bool, + cpu_time: bool, + ) -> Result<(), fmt::Error> { + use num_format::Locale; + use num_format::ToFormattedString; + if received { + writeln!( + w, + "rows received = {}", + self.rows_received.to_formatted_string(&Locale::en) + )?; + } + + if emitted { + writeln!( + w, + "rows emitted = {}", + self.rows_emitted.to_formatted_string(&Locale::en) + )?; + } + + if cpu_time { + let tms = (self.cpu_us as f32) / 1000f32; + writeln!(w, "CPU Time = {:.2}ms", tms)?; + } + + Ok(()) + } +} + +impl RuntimeStatsContext { + pub(crate) fn new() -> Arc { + Arc::new(Self { + rows_received: AtomicU64::new(0), + rows_emitted: AtomicU64::new(0), + cpu_us: AtomicU64::new(0), + }) + } + pub(crate) fn in_span T, T>(&self, span: &tracing::Span, f: F) -> T { + let _enter = span.enter(); + let start = Instant::now(); + let result = f(); + let total = start.elapsed(); + let micros = total.as_micros() as u64; + self.cpu_us + .fetch_add(micros, std::sync::atomic::Ordering::Relaxed); + result + } + + pub(crate) fn mark_rows_received(&self, rows: u64) { + self.rows_received + .fetch_add(rows, std::sync::atomic::Ordering::Relaxed); + } + + pub(crate) fn mark_rows_emitted(&self, rows: u64) { + self.rows_emitted + .fetch_add(rows, std::sync::atomic::Ordering::Relaxed); + } + #[allow(unused)] + pub(crate) fn reset(&self) { + self.rows_received + .store(0, std::sync::atomic::Ordering::Release); + self.rows_emitted + .store(0, std::sync::atomic::Ordering::Release); + self.cpu_us.store(0, std::sync::atomic::Ordering::Release); + } + + pub(crate) fn result(&self) -> RuntimeStats { + RuntimeStats { + rows_received: self + .rows_received + .load(std::sync::atomic::Ordering::Relaxed), + rows_emitted: self.rows_emitted.load(std::sync::atomic::Ordering::Relaxed), + cpu_us: self.cpu_us.load(std::sync::atomic::Ordering::Relaxed), + } + } +} + +pub(crate) struct CountingSender { + sender: SingleSender, + rt: Arc, +} + +impl CountingSender { + pub(crate) fn new(sender: SingleSender, rt: Arc) -> Self { + Self { sender, rt } + } + #[inline] + pub(crate) async fn send( + &self, + v: Arc, + ) -> Result<(), SendError>> { + let len = v.len(); + self.sender.send(v).await?; + self.rt.mark_rows_emitted(len as u64); + Ok(()) + } +} diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs index 044d7011eb..91168efcdb 100644 --- a/src/daft-local-execution/src/sinks/blocking_sink.rs +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_display::tree::TreeDisplay; use common_error::DaftResult; use daft_micropartition::MicroPartition; use tracing::info_span; @@ -7,6 +8,7 @@ use tracing::info_span; use crate::{ channel::{create_channel, MultiSender}, pipeline::PipelineNode, + runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle, NUM_CPUS, }; use async_trait::async_trait; @@ -19,21 +21,25 @@ pub enum BlockingSinkStatus { pub trait BlockingSink: Send + Sync { fn sink(&mut self, input: &Arc) -> DaftResult; fn finalize(&mut self) -> DaftResult>>; - #[allow(dead_code)] fn name(&self) -> &'static str; } pub(crate) struct BlockingSinkNode { // use a RW lock op: Arc>>, + name: &'static str, child: Box, + runtime_stats: Arc, } impl BlockingSinkNode { pub(crate) fn new(op: Box, child: Box) -> Self { + let name = op.name(); BlockingSinkNode { op: Arc::new(tokio::sync::Mutex::new(op)), + name, child, + runtime_stats: RuntimeStatsContext::new(), } } pub(crate) fn boxed(self) -> Box { @@ -41,12 +47,37 @@ impl BlockingSinkNode { } } +impl TreeDisplay for BlockingSinkNode { + fn display_as(&self, level: common_display::DisplayLevel) -> String { + use std::fmt::Write; + let mut display = String::new(); + writeln!(display, "{}", self.name()).unwrap(); + use common_display::DisplayLevel::*; + match level { + Compact => {} + _ => { + let rt_result = self.runtime_stats.result(); + rt_result.display(&mut display, true, true, true).unwrap(); + } + } + display + } + + fn get_children(&self) -> Vec<&dyn TreeDisplay> { + vec![self.child.as_tree_display()] + } +} + #[async_trait] impl PipelineNode for BlockingSinkNode { fn children(&self) -> Vec<&dyn PipelineNode> { vec![self.child.as_ref()] } + fn name(&self) -> &'static str { + self.name + } + async fn start( &mut self, mut destination: MultiSender, @@ -57,21 +88,33 @@ impl PipelineNode for BlockingSinkNode { let child = self.child.as_mut(); child.start(sender, runtime_handle).await?; let op = self.op.clone(); + + let rt_context = self.runtime_stats.clone(); runtime_handle.spawn(async move { let span = info_span!("BlockingSinkNode::execute"); let mut guard = op.lock().await; while let Some(val) = streaming_receiver.recv().await { - if let BlockingSinkStatus::Finished = span.in_scope(|| guard.sink(&val))? { + rt_context.mark_rows_received(val.len() as u64); + if let BlockingSinkStatus::Finished = + rt_context.in_span(&span, || guard.sink(&val))? + { break; } } - let finalized_result = - info_span!("BlockingSinkNode::finalize").in_scope(|| guard.finalize())?; + let finalized_result = rt_context + .in_span(&info_span!("BlockingSinkNode::finalize"), || { + guard.finalize() + })?; if let Some(part) = finalized_result { + let len = part.len(); let _ = destination.get_next_sender().send(part).await; + rt_context.mark_rows_emitted(len as u64); } Ok(()) }); Ok(()) } + fn as_tree_display(&self) -> &dyn TreeDisplay { + self + } } diff --git a/src/daft-local-execution/src/sinks/hash_join.rs b/src/daft-local-execution/src/sinks/hash_join.rs index f3aa30c7a6..b37496d011 100644 --- a/src/daft-local-execution/src/sinks/hash_join.rs +++ b/src/daft-local-execution/src/sinks/hash_join.rs @@ -4,9 +4,11 @@ use crate::{ channel::{create_channel, MultiSender}, intermediate_ops::intermediate_op::{IntermediateNode, IntermediateOperator}, pipeline::PipelineNode, + runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle, NUM_CPUS, }; use async_trait::async_trait; +use common_display::tree::TreeDisplay; use common_error::DaftResult; use daft_core::{ datatypes::Field, @@ -227,6 +229,8 @@ pub(crate) struct HashJoinNode { hash_join: Arc>, left: Box, right: Box, + build_runtime_stats: Arc, + probe_runtime_stats: Arc, } impl HashJoinNode { @@ -239,6 +243,8 @@ impl HashJoinNode { hash_join: Arc::new(tokio::sync::Mutex::new(op)), left, right, + build_runtime_stats: RuntimeStatsContext::new(), + probe_runtime_stats: RuntimeStatsContext::new(), } } pub(crate) fn boxed(self) -> Box { @@ -246,12 +252,46 @@ impl HashJoinNode { } } +impl TreeDisplay for HashJoinNode { + fn display_as(&self, level: common_display::DisplayLevel) -> String { + use std::fmt::Write; + let mut display = String::new(); + writeln!(display, "{}", self.name()).unwrap(); + use common_display::DisplayLevel::*; + match level { + Compact => {} + _ => { + let build_rt_result = self.build_runtime_stats.result(); + writeln!(display, "Probe Table Build:").unwrap(); + + build_rt_result + .display(&mut display, true, false, true) + .unwrap(); + + let probe_rt_result = self.probe_runtime_stats.result(); + writeln!(display, "\nProbe Phase:").unwrap(); + probe_rt_result + .display(&mut display, true, true, true) + .unwrap(); + } + } + display + } + fn get_children(&self) -> Vec<&dyn TreeDisplay> { + vec![self.left.as_tree_display(), self.right.as_tree_display()] + } +} + #[async_trait] impl PipelineNode for HashJoinNode { fn children(&self) -> Vec<&dyn PipelineNode> { vec![self.left.as_ref(), self.right.as_ref()] } + fn name(&self) -> &'static str { + "HashJoin" + } + async fn start( &mut self, mut destination: MultiSender, @@ -260,18 +300,20 @@ impl PipelineNode for HashJoinNode { let (sender, mut pt_receiver) = create_channel(*NUM_CPUS, false); self.left.start(sender, runtime_handle).await?; let hash_join = self.hash_join.clone(); - + let build_runtime_stats = self.build_runtime_stats.clone(); let probe_table_build = tokio::spawn(async move { let span = info_span!("ProbeTable::sink"); let mut guard = hash_join.lock().await; let sink = guard.as_sink(); while let Some(val) = pt_receiver.recv().await { - if let BlockingSinkStatus::Finished = span.in_scope(|| sink.sink(&val))? { + build_runtime_stats.mark_rows_received(val.len() as u64); + if let BlockingSinkStatus::Finished = + build_runtime_stats.in_span(&span, || sink.sink(&val))? + { break; } } - - info_span!("ProbeTable::finalize").in_scope(|| sink.finalize())?; + build_runtime_stats.in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?; DaftResult::Ok(()) }); // should wrap in context join handle @@ -287,7 +329,11 @@ impl PipelineNode for HashJoinNode { let guard = hash_join.lock().await; guard.as_intermediate_op() }; - let probing_node = IntermediateNode::new(probing_op, vec![]); + let probing_node = IntermediateNode::new_with_runtime_stats( + probing_op, + vec![], + self.probe_runtime_stats.clone(), + ); let worker_senders = probing_node .spawn_workers(&mut destination, runtime_handle) .await; @@ -297,4 +343,8 @@ impl PipelineNode for HashJoinNode { )); Ok(()) } + + fn as_tree_display(&self) -> &dyn TreeDisplay { + self + } } diff --git a/src/daft-local-execution/src/sinks/sort.rs b/src/daft-local-execution/src/sinks/sort.rs index eacee7d10b..0a8ef48b5c 100644 --- a/src/daft-local-execution/src/sinks/sort.rs +++ b/src/daft-local-execution/src/sinks/sort.rs @@ -42,9 +42,7 @@ impl BlockingSink for SortSink { } Ok(BlockingSinkStatus::NeedMoreInput) } - fn name(&self) -> &'static str { - "Sort" - } + #[instrument(skip_all, name = "SortSink::finalize")] fn finalize(&mut self) -> DaftResult>> { if let SortState::Building(parts) = &mut self.state { @@ -61,4 +59,7 @@ impl BlockingSink for SortSink { panic!("SortSink should be in Building state"); } } + fn name(&self) -> &'static str { + "SortResult" + } } diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index e73733298e..7eabc7edad 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_display::tree::TreeDisplay; use common_error::DaftResult; use daft_micropartition::MicroPartition; use tracing::info_span; @@ -7,6 +8,7 @@ use tracing::info_span; use crate::{ channel::{create_channel, MultiSender}, pipeline::PipelineNode, + runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle, NUM_CPUS, }; use async_trait::async_trait; @@ -30,14 +32,19 @@ pub trait StreamingSink: Send + Sync { pub(crate) struct StreamingSinkNode { // use a RW lock op: Arc>>, + name: &'static str, children: Vec>, + runtime_stats: Arc, } impl StreamingSinkNode { pub(crate) fn new(op: Box, children: Vec>) -> Self { + let name = op.name(); StreamingSinkNode { op: Arc::new(tokio::sync::Mutex::new(op)), + name, children, + runtime_stats: RuntimeStatsContext::new(), } } pub(crate) fn boxed(self) -> Box { @@ -45,12 +52,39 @@ impl StreamingSinkNode { } } +impl TreeDisplay for StreamingSinkNode { + fn display_as(&self, level: common_display::DisplayLevel) -> String { + use std::fmt::Write; + let mut display = String::new(); + writeln!(display, "{}", self.name()).unwrap(); + use common_display::DisplayLevel::*; + match level { + Compact => {} + _ => { + let rt_result = self.runtime_stats.result(); + rt_result.display(&mut display, true, true, true).unwrap(); + } + } + display + } + fn get_children(&self) -> Vec<&dyn TreeDisplay> { + self.children() + .iter() + .map(|v| v.as_tree_display()) + .collect() + } +} + #[async_trait] impl PipelineNode for StreamingSinkNode { fn children(&self) -> Vec<&dyn PipelineNode> { self.children.iter().map(|v| v.as_ref()).collect() } + fn name(&self) -> &'static str { + self.name + } + async fn start( &mut self, mut destination: MultiSender, @@ -64,6 +98,7 @@ impl PipelineNode for StreamingSinkNode { .expect("we should only have 1 child"); child.start(sender, runtime_handle).await?; let op = self.op.clone(); + let runtime_stats = self.runtime_stats.clone(); runtime_handle.spawn(async move { // this should be a RWLock and run in concurrent workers let span = info_span!("StreamingSink::execute"); @@ -71,24 +106,31 @@ impl PipelineNode for StreamingSinkNode { let mut sink = op.lock().await; let mut is_active = true; while is_active && let Some(val) = streaming_receiver.recv().await { + runtime_stats.mark_rows_received(val.len() as u64); loop { - let result = span.in_scope(|| sink.execute(0, &val))?; + let result = runtime_stats.in_span(&span, || sink.execute(0, &val))?; match result { StreamSinkOutput::HasMoreOutput(mp) => { + let len = mp.len() as u64; let sender = destination.get_next_sender(); sender.send(mp).await.unwrap(); + runtime_stats.mark_rows_emitted(len); } StreamSinkOutput::NeedMoreInput(mp) => { if let Some(mp) = mp { + let len = mp.len() as u64; let sender = destination.get_next_sender(); sender.send(mp).await.unwrap(); + runtime_stats.mark_rows_emitted(len); } break; } StreamSinkOutput::Finished(mp) => { if let Some(mp) = mp { + let len = mp.len() as u64; let sender = destination.get_next_sender(); sender.send(mp).await.unwrap(); + runtime_stats.mark_rows_emitted(len); } is_active = false; break; @@ -100,4 +142,7 @@ impl PipelineNode for StreamingSinkNode { }); Ok(()) } + fn as_tree_display(&self) -> &dyn TreeDisplay { + self + } } diff --git a/src/daft-local-execution/src/sources/in_memory.rs b/src/daft-local-execution/src/sources/in_memory.rs index 3abf67dc67..6ab1724297 100644 --- a/src/daft-local-execution/src/sources/in_memory.rs +++ b/src/daft-local-execution/src/sources/in_memory.rs @@ -1,7 +1,8 @@ use std::sync::Arc; -use crate::{channel::MultiSender, ExecutionRuntimeHandle}; +use crate::{channel::MultiSender, runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle}; use common_error::DaftResult; +use daft_io::IOStatsRef; use daft_micropartition::MicroPartition; use tracing::instrument; @@ -26,14 +27,21 @@ impl Source for InMemorySource { &self, mut destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, + runtime_stats: Arc, + _io_stats: IOStatsRef, ) -> DaftResult<()> { let data = self.data.clone(); runtime_handle.spawn(async move { for part in data { + let len = part.len(); let _ = destination.get_next_sender().send(part).await; + runtime_stats.mark_rows_emitted(len as u64); } Ok(()) }); Ok(()) } + fn name(&self) -> &'static str { + "InMemory" + } } diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 6f62991ba5..56c8b2530d 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -1,7 +1,7 @@ use common_error::DaftResult; use daft_core::schema::SchemaRef; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; -use daft_io::{IOStatsContext, IOStatsRef}; +use daft_io::IOStatsRef; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_micropartition::MicroPartition; use daft_parquet::read::ParquetSchemaInferenceOptions; @@ -17,6 +17,7 @@ use std::sync::Arc; use crate::{ channel::{MultiSender, SingleSender}, + runtime_stats::{CountingSender, RuntimeStatsContext}, ExecutionRuntimeHandle, DEFAULT_MORSEL_SIZE, }; @@ -24,7 +25,6 @@ use super::source::{Source, SourceStream}; use tracing::instrument; -#[derive(Debug)] pub struct ScanTaskSource { scan_tasks: Vec>, } @@ -44,10 +44,13 @@ impl ScanTaskSource { sender: SingleSender, morsel_size: usize, maintain_order: bool, + io_stats: IOStatsRef, + runtime_stats: Arc, ) -> DaftResult<()> { - let io_stats = IOStatsContext::new("StreamScanTask"); let mut stream = stream_scan_task(scan_task, Some(io_stats), maintain_order, morsel_size).await?; + let sender = CountingSender::new(sender, runtime_stats.clone()); + while let Some(partition) = stream.next().await { let _ = sender.send(partition?).await; } @@ -63,6 +66,8 @@ impl Source for ScanTaskSource { &self, mut destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, + runtime_stats: Arc, + io_stats: IOStatsRef, ) -> DaftResult<()> { let morsel_size = DEFAULT_MORSEL_SIZE; let maintain_order = destination.in_order(); @@ -73,10 +78,16 @@ impl Source for ScanTaskSource { sender, morsel_size, maintain_order, + io_stats.clone(), + runtime_stats.clone(), )); } Ok(()) } + + fn name(&self) -> &'static str { + "ScanTask" + } } async fn stream_scan_task( diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 7cbe5ccbda..543cafe94b 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -1,29 +1,74 @@ use std::sync::Arc; +use common_display::{tree::TreeDisplay, utils::bytes_to_human_readable}; use common_error::DaftResult; +use daft_io::{IOStatsContext, IOStatsRef}; use daft_micropartition::MicroPartition; use futures::stream::BoxStream; use async_trait::async_trait; -use crate::{channel::MultiSender, pipeline::PipelineNode, ExecutionRuntimeHandle}; +use crate::{ + channel::MultiSender, pipeline::PipelineNode, runtime_stats::RuntimeStatsContext, + ExecutionRuntimeHandle, +}; pub type SourceStream<'a> = BoxStream<'a, DaftResult>>; -pub trait Source: Send + Sync { +pub(crate) trait Source: Send + Sync { + fn name(&self) -> &'static str; fn get_data( &self, destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, + runtime_stats: Arc, + io_stats: IOStatsRef, ) -> DaftResult<()>; } struct SourceNode { source: Box, + runtime_stats: Arc, + io_stats: IOStatsRef, +} + +impl TreeDisplay for SourceNode { + fn display_as(&self, level: common_display::DisplayLevel) -> String { + use std::fmt::Write; + let mut display = String::new(); + writeln!(display, "{}", self.name()).unwrap(); + use common_display::DisplayLevel::*; + match level { + Compact => {} + _ => { + let rt_result = self.runtime_stats.result(); + + writeln!(display).unwrap(); + rt_result.display(&mut display, false, true, false).unwrap(); + let bytes_read = self.io_stats.load_bytes_read(); + writeln!( + display, + "bytes read = {}", + bytes_to_human_readable(bytes_read) + ) + .unwrap(); + } + } + display + } + fn get_children(&self) -> Vec<&dyn TreeDisplay> { + self.children() + .iter() + .map(|v| v.as_tree_display()) + .collect() + } } #[async_trait] impl PipelineNode for SourceNode { + fn name(&self) -> &'static str { + self.source.name() + } fn children(&self) -> Vec<&dyn PipelineNode> { vec![] } @@ -32,12 +77,25 @@ impl PipelineNode for SourceNode { destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, ) -> DaftResult<()> { - self.source.get_data(destination, runtime_handle) + self.source.get_data( + destination, + runtime_handle, + self.runtime_stats.clone(), + self.io_stats.clone(), + ) + } + fn as_tree_display(&self) -> &dyn TreeDisplay { + self } } impl From> for Box { fn from(source: Box) -> Self { - Box::new(SourceNode { source }) + let name = source.name(); + Box::new(SourceNode { + source, + runtime_stats: RuntimeStatsContext::new(), + io_stats: IOStatsContext::new(name), + }) } } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 1ddad000a8..1a430628e8 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -370,6 +370,7 @@ async fn stream_parquet_single( schema_infer_options, metadata, maintain_order, + io_stats, ) } else { let builder = ParquetReaderBuilder::from_uri( diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index 9939b27d61..d7572d4991 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -1,4 +1,9 @@ -use std::{collections::HashSet, fs::File, sync::Arc}; +use std::{ + collections::HashSet, + fs::File, + io::{Read, Seek}, + sync::Arc, +}; use arrow2::io::parquet::read; use common_error::DaftResult; @@ -8,6 +13,7 @@ use daft_core::{ Series, }; use daft_dsl::ExprRef; +use daft_io::IOStatsRef; use daft_table::Table; use futures::{stream::BoxStream, StreamExt}; use itertools::Itertools; @@ -118,6 +124,61 @@ pub(crate) fn arrow_column_iters_to_table_iter( }) } +struct CountingReader { + reader: R, + count: usize, + io_stats: Option, +} + +impl CountingReader { + fn update_count(&mut self) { + if let Some(ios) = &self.io_stats { + ios.mark_bytes_read(self.count); + self.count = 0; + } + } +} + +impl Read for CountingReader +where + R: Read + Seek, +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let read = self.reader.read(buf)?; + self.count += read; + Ok(read) + } + #[inline] + fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result { + let read = self.reader.read_vectored(bufs)?; + self.count += read; + Ok(read) + } + #[inline] + fn read_to_end(&mut self, buf: &mut Vec) -> std::io::Result { + let read = self.reader.read_to_end(buf)?; + self.count += read; + Ok(read) + } +} + +impl Seek for CountingReader +where + R: Read + Seek, +{ + #[inline] + fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result { + self.reader.seek(pos) + } +} + +impl Drop for CountingReader { + fn drop(&mut self) { + self.update_count() + } +} + #[allow(clippy::too_many_arguments)] pub(crate) fn local_parquet_read_into_column_iters( uri: &str, @@ -129,6 +190,7 @@ pub(crate) fn local_parquet_read_into_column_iters( schema_infer_options: ParquetSchemaInferenceOptions, metadata: Option>, chunk_size: usize, + io_stats: Option, ) -> super::Result<( Arc, SchemaRef, @@ -141,9 +203,10 @@ pub(crate) fn local_parquet_read_into_column_iters( .map(|s| s.to_string()) .unwrap_or(uri.to_string()); - let mut reader = File::open(uri.clone()).with_context(|_| super::InternalIOSnafu { + let reader = File::open(uri.clone()).with_context(|_| super::InternalIOSnafu { path: uri.to_string(), })?; + io_stats.as_ref().inspect(|ios| ios.mark_get_requests(1)); let size = reader .metadata() .with_context(|_| super::InternalIOSnafu { @@ -157,6 +220,11 @@ pub(crate) fn local_parquet_read_into_column_iters( file_size: size as usize, }); } + let mut reader = CountingReader { + reader, + count: 0, + io_stats, + }; let metadata = match metadata { Some(m) => m, @@ -205,9 +273,9 @@ pub(crate) fn local_parquet_read_into_column_iters( None, ) .with_context(|_| super::UnableToReadParquetRowGroupSnafu { path: uri.clone() })?; + reader.update_count(); Ok(single_rg_column_iter) }); - Ok(( metadata, Arc::new(daft_schema), @@ -428,6 +496,7 @@ pub(crate) fn local_parquet_stream( schema_infer_options: ParquetSchemaInferenceOptions, metadata: Option>, maintain_order: bool, + io_stats: Option, ) -> DaftResult<( Arc, BoxStream<'static, DaftResult>, @@ -443,6 +512,7 @@ pub(crate) fn local_parquet_stream( schema_infer_options, metadata, chunk_size, + io_stats, )?; // Create a channel for each row group to send the processed tables to the stream diff --git a/src/daft-plan/src/display.rs b/src/daft-plan/src/display.rs index dc77c80dae..5af4c9034b 100644 --- a/src/daft-plan/src/display.rs +++ b/src/daft-plan/src/display.rs @@ -283,6 +283,7 @@ Project1 --> Limit0 let plan = plan_1(); let opts = MermaidDisplayOptions { simple: false, + bottom_up: false, subgraph_options: Some(SubgraphOptions { name: "Optimized Logical Plan".to_string(), subgraph_id: "optimized".to_string(),