From 39cd0ad56dce4b284e557dcde4e5b4c09487d754 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 16 Aug 2024 17:02:40 -0700 Subject: [PATCH] [CHORE] Add error snafus for local executor (#2660) Example: ``` df = daft.from_pydict({"a": ["foo", "bar", "baz"]}) df = df.select(df["a"].str.match("[")) df.show() Error when running pipeline node ProjectOperator Traceback (most recent call last): File "test.py", line 21, in df.show() File "/Users/colinho/Desktop/Daft/daft/api_annotations.py", line 26, in _wrap return timed_method(*args, **kwargs) File "/Users/colinho/Desktop/Daft/daft/analytics.py", line 185, in tracked_method return method(*args, **kwargs) File "/Users/colinho/Desktop/Daft/daft/dataframe/dataframe.py", line 2278, in show dataframe_display = self._construct_show_display(n) File "/Users/colinho/Desktop/Daft/daft/dataframe/dataframe.py", line 2235, in _construct_show_display for table in get_context().runner().run_iter_tables(builder, results_buffer_size=1): File "/Users/colinho/Desktop/Daft/daft/runners/pyrunner.py", line 216, in run_iter_tables for result in self.run_iter(builder, results_buffer_size=results_buffer_size): File "/Users/colinho/Desktop/Daft/daft/runners/pyrunner.py", line 197, in run_iter results_gen = executor.run( File "/Users/colinho/Desktop/Daft/daft/execution/native_executor.py", line 33, in run PyMaterializedResult(MicroPartition._from_pymicropartition(part)) for part in self._executor.run(psets_mp) daft.exceptions.DaftCoreException: DaftError::ValueError regex parse error: [ ^ error: unclosed character class ``` --------- Co-authored-by: Colin Ho Co-authored-by: Colin Ho --- Cargo.lock | 1 + src/daft-local-execution/Cargo.toml | 1 + .../src/intermediate_ops/intermediate_op.rs | 19 +++--- src/daft-local-execution/src/lib.rs | 35 ++++++++-- src/daft-local-execution/src/pipeline.rs | 13 ++-- src/daft-local-execution/src/run.rs | 4 +- .../src/sinks/blocking_sink.rs | 47 +++++++------ .../src/sinks/hash_join.rs | 46 +++++++------ .../src/sinks/streaming_sink.rs | 67 ++++++++++--------- .../src/sources/in_memory.rs | 22 +++--- .../src/sources/scan_task.rs | 21 +++--- .../src/sources/source.rs | 4 +- 12 files changed, 165 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 979d34b054..dffe85cb03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1845,6 +1845,7 @@ dependencies = [ "daft-table", "futures", "lazy_static", + "log", "num-format", "pyo3", "snafu", diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 211c0ac627..cc0c696805 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -18,6 +18,7 @@ daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} futures = {workspace = true} lazy_static = {workspace = true} +log = {workspace = true} num-format = "0.4.4" pyo3 = {workspace = true, optional = true} snafu = {workspace = true} diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index f924a72825..0dbe69f6af 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -88,12 +88,15 @@ impl IntermediateNode { for _ in 0..num_senders { let (worker_sender, worker_receiver) = create_single_channel(1); let destination_sender = destination.get_next_sender(); - runtime_handle.spawn(Self::run_worker( - self.intermediate_op.clone(), - worker_receiver, - destination_sender, - self.runtime_stats.clone(), - )); + runtime_handle.spawn( + Self::run_worker( + self.intermediate_op.clone(), + worker_receiver, + destination_sender, + self.runtime_stats.clone(), + ), + self.intermediate_op.name(), + ); worker_senders.push(worker_sender); } worker_senders @@ -152,7 +155,7 @@ impl PipelineNode for IntermediateNode { &mut self, mut destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, - ) -> DaftResult<()> { + ) -> crate::Result<()> { assert_eq!( self.children.len(), 1, @@ -166,7 +169,7 @@ impl PipelineNode for IntermediateNode { child.start(sender, runtime_handle).await?; let worker_senders = self.spawn_workers(&mut destination, runtime_handle).await; - runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders)); + runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders), self.name()); Ok(()) } fn as_tree_display(&self) -> &dyn TreeDisplay { diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index 99958c3ecb..a8c7545de6 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -9,14 +9,14 @@ mod sources; use common_error::{DaftError, DaftResult}; use lazy_static::lazy_static; pub use run::NativeExecutor; +use snafu::futures::TryFutureExt; use snafu::Snafu; - lazy_static! { pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get(); } pub struct ExecutionRuntimeHandle { - pub worker_set: tokio::task::JoinSet>, + pub worker_set: tokio::task::JoinSet>, } impl Default for ExecutionRuntimeHandle { @@ -34,11 +34,14 @@ impl ExecutionRuntimeHandle { pub fn spawn( &mut self, task: impl std::future::Future> + Send + 'static, + node_name: &str, ) { - self.worker_set.spawn(task); + let node_name = node_name.to_string(); + self.worker_set + .spawn(task.with_context(|_| PipelineExecutionSnafu { node_name })); } - pub async fn join_next(&mut self) -> Option, tokio::task::JoinError>> { + pub async fn join_next(&mut self) -> Option, tokio::task::JoinError>> { self.worker_set.join_next().await } @@ -63,14 +66,36 @@ pub enum Error { OneShotRecvError { source: tokio::sync::oneshot::error::RecvError, }, + #[snafu(display("Error creating pipeline from {}: {}", plan_name, source))] + PipelineCreationError { + source: DaftError, + plan_name: String, + }, + #[snafu(display("Error when running pipeline node {}: {}", node_name, source))] + PipelineExecutionError { + source: DaftError, + node_name: String, + }, } impl From for DaftError { fn from(err: Error) -> DaftError { - DaftError::External(err.into()) + match err { + Error::PipelineCreationError { source, plan_name } => { + log::error!("Error creating pipeline from {}", plan_name); + source + } + Error::PipelineExecutionError { source, node_name } => { + log::error!("Error when running pipeline node {}", node_name); + source + } + _ => DaftError::External(err.into()), + } } } +type Result = std::result::Result; + #[cfg(feature = "python")] pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_class::()?; diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 9d5ad6143d..723bcc18ed 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -14,12 +14,11 @@ use crate::{ streaming_sink::StreamingSinkNode, }, sources::in_memory::InMemorySource, - ExecutionRuntimeHandle, + ExecutionRuntimeHandle, PipelineCreationSnafu, }; use async_trait::async_trait; use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay}; -use common_error::DaftResult; use daft_dsl::Expr; use daft_micropartition::MicroPartition; use daft_physical_plan::{ @@ -27,6 +26,7 @@ use daft_physical_plan::{ UnGroupedAggregate, }; use daft_plan::populate_aggregation_stages; +use snafu::ResultExt; use crate::channel::MultiSender; @@ -38,7 +38,7 @@ pub trait PipelineNode: Sync + Send + TreeDisplay { &mut self, destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, - ) -> DaftResult<()>; + ) -> crate::Result<()>; fn as_tree_display(&self) -> &dyn TreeDisplay; } @@ -58,7 +58,7 @@ pub(crate) fn viz_pipeline(root: &dyn PipelineNode) -> String { pub fn physical_plan_to_pipeline( physical_plan: &LocalPhysicalPlan, psets: &HashMap>>, -) -> DaftResult> { +) -> crate::Result> { use crate::sources::scan_task::ScanTaskSource; use daft_physical_plan::PhysicalScan; let out: Box = match physical_plan { @@ -199,7 +199,10 @@ pub fn physical_plan_to_pipeline( *join_type, left_schema, right_schema, - )?; + ) + .with_context(|_| PipelineCreationSnafu { + plan_name: physical_plan.name(), + })?; HashJoinNode::new(sink, left_node, right_node).boxed() } _ => { diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 36050f9165..e4c854f9de 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -121,7 +121,7 @@ pub fn run_local( .expect("Failed to create tokio runtime"); let res = runtime.block_on(async { - let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets).unwrap(); + let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets)?; let (sender, mut receiver) = create_channel(1, true); let mut runtime_handle = ExecutionRuntimeHandle::default(); @@ -135,7 +135,7 @@ pub fn run_local( match result { Ok(Err(e)) => { runtime_handle.shutdown().await; - return DaftResult::Err(e); + return DaftResult::Err(e.into()); } Err(e) => { runtime_handle.shutdown().await; diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs index 91168efcdb..17320734ec 100644 --- a/src/daft-local-execution/src/sinks/blocking_sink.rs +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -82,7 +82,7 @@ impl PipelineNode for BlockingSinkNode { &mut self, mut destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, - ) -> DaftResult<()> { + ) -> crate::Result<()> { let (sender, mut streaming_receiver) = create_channel(*NUM_CPUS, true); // now we can start building the right side let child = self.child.as_mut(); @@ -90,28 +90,31 @@ impl PipelineNode for BlockingSinkNode { let op = self.op.clone(); let rt_context = self.runtime_stats.clone(); - runtime_handle.spawn(async move { - let span = info_span!("BlockingSinkNode::execute"); - let mut guard = op.lock().await; - while let Some(val) = streaming_receiver.recv().await { - rt_context.mark_rows_received(val.len() as u64); - if let BlockingSinkStatus::Finished = - rt_context.in_span(&span, || guard.sink(&val))? - { - break; + runtime_handle.spawn( + async move { + let span = info_span!("BlockingSinkNode::execute"); + let mut guard = op.lock().await; + while let Some(val) = streaming_receiver.recv().await { + rt_context.mark_rows_received(val.len() as u64); + if let BlockingSinkStatus::Finished = + rt_context.in_span(&span, || guard.sink(&val))? + { + break; + } } - } - let finalized_result = rt_context - .in_span(&info_span!("BlockingSinkNode::finalize"), || { - guard.finalize() - })?; - if let Some(part) = finalized_result { - let len = part.len(); - let _ = destination.get_next_sender().send(part).await; - rt_context.mark_rows_emitted(len as u64); - } - Ok(()) - }); + let finalized_result = rt_context + .in_span(&info_span!("BlockingSinkNode::finalize"), || { + guard.finalize() + })?; + if let Some(part) = finalized_result { + let len = part.len(); + let _ = destination.get_next_sender().send(part).await; + rt_context.mark_rows_emitted(len as u64); + } + Ok(()) + }, + self.name(), + ); Ok(()) } fn as_tree_display(&self) -> &dyn TreeDisplay { diff --git a/src/daft-local-execution/src/sinks/hash_join.rs b/src/daft-local-execution/src/sinks/hash_join.rs index b37496d011..77b22a5791 100644 --- a/src/daft-local-execution/src/sinks/hash_join.rs +++ b/src/daft-local-execution/src/sinks/hash_join.rs @@ -5,7 +5,7 @@ use crate::{ intermediate_ops::intermediate_op::{IntermediateNode, IntermediateOperator}, pipeline::PipelineNode, runtime_stats::RuntimeStatsContext, - ExecutionRuntimeHandle, NUM_CPUS, + ExecutionRuntimeHandle, JoinSnafu, PipelineExecutionSnafu, NUM_CPUS, }; use async_trait::async_trait; use common_display::tree::TreeDisplay; @@ -18,6 +18,7 @@ use daft_core::{ use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; use daft_plan::JoinType; +use snafu::{futures::TryFutureExt, ResultExt}; use tracing::info_span; use super::blocking_sink::{BlockingSink, BlockingSinkStatus}; @@ -296,33 +297,38 @@ impl PipelineNode for HashJoinNode { &mut self, mut destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, - ) -> DaftResult<()> { + ) -> crate::Result<()> { let (sender, mut pt_receiver) = create_channel(*NUM_CPUS, false); self.left.start(sender, runtime_handle).await?; let hash_join = self.hash_join.clone(); let build_runtime_stats = self.build_runtime_stats.clone(); - let probe_table_build = tokio::spawn(async move { - let span = info_span!("ProbeTable::sink"); - let mut guard = hash_join.lock().await; - let sink = guard.as_sink(); - while let Some(val) = pt_receiver.recv().await { - build_runtime_stats.mark_rows_received(val.len() as u64); - if let BlockingSinkStatus::Finished = - build_runtime_stats.in_span(&span, || sink.sink(&val))? - { - break; + let name = self.name(); + let probe_table_build = tokio::spawn( + async move { + let span = info_span!("ProbeTable::sink"); + let mut guard = hash_join.lock().await; + let sink = guard.as_sink(); + while let Some(val) = pt_receiver.recv().await { + build_runtime_stats.mark_rows_received(val.len() as u64); + if let BlockingSinkStatus::Finished = + build_runtime_stats.in_span(&span, || sink.sink(&val))? + { + break; + } } + build_runtime_stats + .in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?; + DaftResult::Ok(()) } - build_runtime_stats.in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?; - DaftResult::Ok(()) - }); + .with_context(move |_| PipelineExecutionSnafu { node_name: name }), + ); // should wrap in context join handle let (right_sender, streaming_receiver) = create_channel(*NUM_CPUS, destination.in_order()); // now we can start building the right side self.right.start(right_sender, runtime_handle).await?; - probe_table_build.await.unwrap()?; + probe_table_build.await.context(JoinSnafu {})??; let hash_join = self.hash_join.clone(); let probing_op = { @@ -337,10 +343,10 @@ impl PipelineNode for HashJoinNode { let worker_senders = probing_node .spawn_workers(&mut destination, runtime_handle) .await; - runtime_handle.spawn(IntermediateNode::send_to_workers( - streaming_receiver, - worker_senders, - )); + runtime_handle.spawn( + IntermediateNode::send_to_workers(streaming_receiver, worker_senders), + self.name(), + ); Ok(()) } diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index 7eabc7edad..76bd905c9c 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -89,7 +89,7 @@ impl PipelineNode for StreamingSinkNode { &mut self, mut destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, - ) -> DaftResult<()> { + ) -> crate::Result<()> { let (sender, mut streaming_receiver) = create_channel(*NUM_CPUS, destination.in_order()); // now we can start building the right side let child = self @@ -99,47 +99,50 @@ impl PipelineNode for StreamingSinkNode { child.start(sender, runtime_handle).await?; let op = self.op.clone(); let runtime_stats = self.runtime_stats.clone(); - runtime_handle.spawn(async move { - // this should be a RWLock and run in concurrent workers - let span = info_span!("StreamingSink::execute"); + runtime_handle.spawn( + async move { + // this should be a RWLock and run in concurrent workers + let span = info_span!("StreamingSink::execute"); - let mut sink = op.lock().await; - let mut is_active = true; - while is_active && let Some(val) = streaming_receiver.recv().await { - runtime_stats.mark_rows_received(val.len() as u64); - loop { - let result = runtime_stats.in_span(&span, || sink.execute(0, &val))?; - match result { - StreamSinkOutput::HasMoreOutput(mp) => { - let len = mp.len() as u64; - let sender = destination.get_next_sender(); - sender.send(mp).await.unwrap(); - runtime_stats.mark_rows_emitted(len); - } - StreamSinkOutput::NeedMoreInput(mp) => { - if let Some(mp) = mp { + let mut sink = op.lock().await; + let mut is_active = true; + while is_active && let Some(val) = streaming_receiver.recv().await { + runtime_stats.mark_rows_received(val.len() as u64); + loop { + let result = runtime_stats.in_span(&span, || sink.execute(0, &val))?; + match result { + StreamSinkOutput::HasMoreOutput(mp) => { let len = mp.len() as u64; let sender = destination.get_next_sender(); sender.send(mp).await.unwrap(); runtime_stats.mark_rows_emitted(len); } - break; - } - StreamSinkOutput::Finished(mp) => { - if let Some(mp) = mp { - let len = mp.len() as u64; - let sender = destination.get_next_sender(); - sender.send(mp).await.unwrap(); - runtime_stats.mark_rows_emitted(len); + StreamSinkOutput::NeedMoreInput(mp) => { + if let Some(mp) = mp { + let len = mp.len() as u64; + let sender = destination.get_next_sender(); + sender.send(mp).await.unwrap(); + runtime_stats.mark_rows_emitted(len); + } + break; + } + StreamSinkOutput::Finished(mp) => { + if let Some(mp) = mp { + let len = mp.len() as u64; + let sender = destination.get_next_sender(); + sender.send(mp).await.unwrap(); + runtime_stats.mark_rows_emitted(len); + } + is_active = false; + break; } - is_active = false; - break; } } } - } - DaftResult::Ok(()) - }); + DaftResult::Ok(()) + }, + self.name(), + ); Ok(()) } fn as_tree_display(&self) -> &dyn TreeDisplay { diff --git a/src/daft-local-execution/src/sources/in_memory.rs b/src/daft-local-execution/src/sources/in_memory.rs index 6ab1724297..8d02a7486d 100644 --- a/src/daft-local-execution/src/sources/in_memory.rs +++ b/src/daft-local-execution/src/sources/in_memory.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use crate::{channel::MultiSender, runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle}; -use common_error::DaftResult; use daft_io::IOStatsRef; use daft_micropartition::MicroPartition; use tracing::instrument; @@ -29,16 +28,19 @@ impl Source for InMemorySource { runtime_handle: &mut ExecutionRuntimeHandle, runtime_stats: Arc, _io_stats: IOStatsRef, - ) -> DaftResult<()> { + ) -> crate::Result<()> { let data = self.data.clone(); - runtime_handle.spawn(async move { - for part in data { - let len = part.len(); - let _ = destination.get_next_sender().send(part).await; - runtime_stats.mark_rows_emitted(len as u64); - } - Ok(()) - }); + runtime_handle.spawn( + async move { + for part in data { + let len = part.len(); + let _ = destination.get_next_sender().send(part).await; + runtime_stats.mark_rows_emitted(len as u64); + } + Ok(()) + }, + self.name(), + ); Ok(()) } fn name(&self) -> &'static str { diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 56c8b2530d..6f62e3567c 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -68,19 +68,22 @@ impl Source for ScanTaskSource { runtime_handle: &mut ExecutionRuntimeHandle, runtime_stats: Arc, io_stats: IOStatsRef, - ) -> DaftResult<()> { + ) -> crate::Result<()> { let morsel_size = DEFAULT_MORSEL_SIZE; let maintain_order = destination.in_order(); for scan_task in self.scan_tasks.clone() { let sender = destination.get_next_sender(); - runtime_handle.spawn(Self::process_scan_task_stream( - scan_task, - sender, - morsel_size, - maintain_order, - io_stats.clone(), - runtime_stats.clone(), - )); + runtime_handle.spawn( + Self::process_scan_task_stream( + scan_task, + sender, + morsel_size, + maintain_order, + io_stats.clone(), + runtime_stats.clone(), + ), + self.name(), + ); } Ok(()) } diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 543cafe94b..818e361589 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -23,7 +23,7 @@ pub(crate) trait Source: Send + Sync { runtime_handle: &mut ExecutionRuntimeHandle, runtime_stats: Arc, io_stats: IOStatsRef, - ) -> DaftResult<()>; + ) -> crate::Result<()>; } struct SourceNode { @@ -76,7 +76,7 @@ impl PipelineNode for SourceNode { &mut self, destination: MultiSender, runtime_handle: &mut ExecutionRuntimeHandle, - ) -> DaftResult<()> { + ) -> crate::Result<()> { self.source.get_data( destination, runtime_handle,