Skip to content

Commit

Permalink
[CHORE] Add error snafus for local executor (#2660)
Browse files Browse the repository at this point in the history
Example:
```
df = daft.from_pydict({"a": ["foo", "bar", "baz"]})
df = df.select(df["a"].str.match("["))
df.show()
Error when running pipeline node ProjectOperator
Traceback (most recent call last):
  File "test.py", line 21, in <module>
    df.show()
  File "/Users/colinho/Desktop/Daft/daft/api_annotations.py", line 26, in _wrap
    return timed_method(*args, **kwargs)
  File "/Users/colinho/Desktop/Daft/daft/analytics.py", line 185, in tracked_method
    return method(*args, **kwargs)
  File "/Users/colinho/Desktop/Daft/daft/dataframe/dataframe.py", line 2278, in show
    dataframe_display = self._construct_show_display(n)
  File "/Users/colinho/Desktop/Daft/daft/dataframe/dataframe.py", line 2235, in _construct_show_display
    for table in get_context().runner().run_iter_tables(builder, results_buffer_size=1):
  File "/Users/colinho/Desktop/Daft/daft/runners/pyrunner.py", line 216, in run_iter_tables
    for result in self.run_iter(builder, results_buffer_size=results_buffer_size):
  File "/Users/colinho/Desktop/Daft/daft/runners/pyrunner.py", line 197, in run_iter
    results_gen = executor.run(
  File "/Users/colinho/Desktop/Daft/daft/execution/native_executor.py", line 33, in run
    PyMaterializedResult(MicroPartition._from_pymicropartition(part)) for part in self._executor.run(psets_mp)
daft.exceptions.DaftCoreException: DaftError::ValueError regex parse error:
    [
    ^
error: unclosed character class
```

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Aug 17, 2024
1 parent b15b0c7 commit 39cd0ad
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 115 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
num-format = "0.4.4"
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
Expand Down
19 changes: 11 additions & 8 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ impl IntermediateNode {
for _ in 0..num_senders {
let (worker_sender, worker_receiver) = create_single_channel(1);
let destination_sender = destination.get_next_sender();
runtime_handle.spawn(Self::run_worker(
self.intermediate_op.clone(),
worker_receiver,
destination_sender,
self.runtime_stats.clone(),
));
runtime_handle.spawn(
Self::run_worker(
self.intermediate_op.clone(),
worker_receiver,
destination_sender,
self.runtime_stats.clone(),
),
self.intermediate_op.name(),
);
worker_senders.push(worker_sender);
}
worker_senders
Expand Down Expand Up @@ -152,7 +155,7 @@ impl PipelineNode for IntermediateNode {
&mut self,
mut destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()> {
) -> crate::Result<()> {
assert_eq!(
self.children.len(),
1,
Expand All @@ -166,7 +169,7 @@ impl PipelineNode for IntermediateNode {
child.start(sender, runtime_handle).await?;

let worker_senders = self.spawn_workers(&mut destination, runtime_handle).await;
runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders));
runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders), self.name());
Ok(())
}
fn as_tree_display(&self) -> &dyn TreeDisplay {
Expand Down
35 changes: 30 additions & 5 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ mod sources;
use common_error::{DaftError, DaftResult};
use lazy_static::lazy_static;
pub use run::NativeExecutor;
use snafu::futures::TryFutureExt;
use snafu::Snafu;

lazy_static! {
pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get();
}

pub struct ExecutionRuntimeHandle {
pub worker_set: tokio::task::JoinSet<DaftResult<()>>,
pub worker_set: tokio::task::JoinSet<crate::Result<()>>,
}

impl Default for ExecutionRuntimeHandle {
Expand All @@ -34,11 +34,14 @@ impl ExecutionRuntimeHandle {
pub fn spawn(
&mut self,
task: impl std::future::Future<Output = DaftResult<()>> + Send + 'static,
node_name: &str,
) {
self.worker_set.spawn(task);
let node_name = node_name.to_string();
self.worker_set
.spawn(task.with_context(|_| PipelineExecutionSnafu { node_name }));
}

pub async fn join_next(&mut self) -> Option<Result<DaftResult<()>, tokio::task::JoinError>> {
pub async fn join_next(&mut self) -> Option<Result<crate::Result<()>, tokio::task::JoinError>> {
self.worker_set.join_next().await
}

Expand All @@ -63,14 +66,36 @@ pub enum Error {
OneShotRecvError {
source: tokio::sync::oneshot::error::RecvError,
},
#[snafu(display("Error creating pipeline from {}: {}", plan_name, source))]
PipelineCreationError {
source: DaftError,
plan_name: String,
},
#[snafu(display("Error when running pipeline node {}: {}", node_name, source))]
PipelineExecutionError {
source: DaftError,
node_name: String,
},
}

impl From<Error> for DaftError {
fn from(err: Error) -> DaftError {
DaftError::External(err.into())
match err {
Error::PipelineCreationError { source, plan_name } => {
log::error!("Error creating pipeline from {}", plan_name);
source
}
Error::PipelineExecutionError { source, node_name } => {
log::error!("Error when running pipeline node {}", node_name);
source
}
_ => DaftError::External(err.into()),
}
}
}

type Result<T, E = Error> = std::result::Result<T, E>;

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<NativeExecutor>()?;
Expand Down
13 changes: 8 additions & 5 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ use crate::{
streaming_sink::StreamingSinkNode,
},
sources::in_memory::InMemorySource,
ExecutionRuntimeHandle,
ExecutionRuntimeHandle, PipelineCreationSnafu,
};

use async_trait::async_trait;
use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay};
use common_error::DaftResult;
use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
use daft_physical_plan::{
Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, Project, Sort,
UnGroupedAggregate,
};
use daft_plan::populate_aggregation_stages;
use snafu::ResultExt;

use crate::channel::MultiSender;

Expand All @@ -38,7 +38,7 @@ pub trait PipelineNode: Sync + Send + TreeDisplay {
&mut self,
destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()>;
) -> crate::Result<()>;

fn as_tree_display(&self) -> &dyn TreeDisplay;
}
Expand All @@ -58,7 +58,7 @@ pub(crate) fn viz_pipeline(root: &dyn PipelineNode) -> String {
pub fn physical_plan_to_pipeline(
physical_plan: &LocalPhysicalPlan,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
) -> DaftResult<Box<dyn PipelineNode>> {
) -> crate::Result<Box<dyn PipelineNode>> {
use crate::sources::scan_task::ScanTaskSource;
use daft_physical_plan::PhysicalScan;
let out: Box<dyn PipelineNode> = match physical_plan {
Expand Down Expand Up @@ -199,7 +199,10 @@ pub fn physical_plan_to_pipeline(
*join_type,
left_schema,
right_schema,
)?;
)
.with_context(|_| PipelineCreationSnafu {
plan_name: physical_plan.name(),
})?;
HashJoinNode::new(sink, left_node, right_node).boxed()
}
_ => {
Expand Down
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn run_local(
.expect("Failed to create tokio runtime");

let res = runtime.block_on(async {
let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets).unwrap();
let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets)?;
let (sender, mut receiver) = create_channel(1, true);

let mut runtime_handle = ExecutionRuntimeHandle::default();
Expand All @@ -135,7 +135,7 @@ pub fn run_local(
match result {
Ok(Err(e)) => {
runtime_handle.shutdown().await;
return DaftResult::Err(e);
return DaftResult::Err(e.into());
}
Err(e) => {
runtime_handle.shutdown().await;
Expand Down
47 changes: 25 additions & 22 deletions src/daft-local-execution/src/sinks/blocking_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,36 +82,39 @@ impl PipelineNode for BlockingSinkNode {
&mut self,
mut destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()> {
) -> crate::Result<()> {
let (sender, mut streaming_receiver) = create_channel(*NUM_CPUS, true);
// now we can start building the right side
let child = self.child.as_mut();
child.start(sender, runtime_handle).await?;
let op = self.op.clone();

let rt_context = self.runtime_stats.clone();
runtime_handle.spawn(async move {
let span = info_span!("BlockingSinkNode::execute");
let mut guard = op.lock().await;
while let Some(val) = streaming_receiver.recv().await {
rt_context.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
rt_context.in_span(&span, || guard.sink(&val))?
{
break;
runtime_handle.spawn(
async move {
let span = info_span!("BlockingSinkNode::execute");
let mut guard = op.lock().await;
while let Some(val) = streaming_receiver.recv().await {
rt_context.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
rt_context.in_span(&span, || guard.sink(&val))?
{
break;
}
}
}
let finalized_result = rt_context
.in_span(&info_span!("BlockingSinkNode::finalize"), || {
guard.finalize()
})?;
if let Some(part) = finalized_result {
let len = part.len();
let _ = destination.get_next_sender().send(part).await;
rt_context.mark_rows_emitted(len as u64);
}
Ok(())
});
let finalized_result = rt_context
.in_span(&info_span!("BlockingSinkNode::finalize"), || {
guard.finalize()
})?;
if let Some(part) = finalized_result {
let len = part.len();
let _ = destination.get_next_sender().send(part).await;
rt_context.mark_rows_emitted(len as u64);
}
Ok(())
},
self.name(),
);
Ok(())
}
fn as_tree_display(&self) -> &dyn TreeDisplay {
Expand Down
46 changes: 26 additions & 20 deletions src/daft-local-execution/src/sinks/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
intermediate_ops::intermediate_op::{IntermediateNode, IntermediateOperator},
pipeline::PipelineNode,
runtime_stats::RuntimeStatsContext,
ExecutionRuntimeHandle, NUM_CPUS,
ExecutionRuntimeHandle, JoinSnafu, PipelineExecutionSnafu, NUM_CPUS,
};
use async_trait::async_trait;
use common_display::tree::TreeDisplay;
Expand All @@ -18,6 +18,7 @@ use daft_core::{
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use daft_plan::JoinType;
use snafu::{futures::TryFutureExt, ResultExt};
use tracing::info_span;

use super::blocking_sink::{BlockingSink, BlockingSinkStatus};
Expand Down Expand Up @@ -296,33 +297,38 @@ impl PipelineNode for HashJoinNode {
&mut self,
mut destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()> {
) -> crate::Result<()> {
let (sender, mut pt_receiver) = create_channel(*NUM_CPUS, false);
self.left.start(sender, runtime_handle).await?;
let hash_join = self.hash_join.clone();
let build_runtime_stats = self.build_runtime_stats.clone();
let probe_table_build = tokio::spawn(async move {
let span = info_span!("ProbeTable::sink");
let mut guard = hash_join.lock().await;
let sink = guard.as_sink();
while let Some(val) = pt_receiver.recv().await {
build_runtime_stats.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
build_runtime_stats.in_span(&span, || sink.sink(&val))?
{
break;
let name = self.name();
let probe_table_build = tokio::spawn(
async move {
let span = info_span!("ProbeTable::sink");
let mut guard = hash_join.lock().await;
let sink = guard.as_sink();
while let Some(val) = pt_receiver.recv().await {
build_runtime_stats.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
build_runtime_stats.in_span(&span, || sink.sink(&val))?
{
break;
}
}
build_runtime_stats
.in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?;
DaftResult::Ok(())
}
build_runtime_stats.in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?;
DaftResult::Ok(())
});
.with_context(move |_| PipelineExecutionSnafu { node_name: name }),
);
// should wrap in context join handle

let (right_sender, streaming_receiver) = create_channel(*NUM_CPUS, destination.in_order());
// now we can start building the right side
self.right.start(right_sender, runtime_handle).await?;

probe_table_build.await.unwrap()?;
probe_table_build.await.context(JoinSnafu {})??;

let hash_join = self.hash_join.clone();
let probing_op = {
Expand All @@ -337,10 +343,10 @@ impl PipelineNode for HashJoinNode {
let worker_senders = probing_node
.spawn_workers(&mut destination, runtime_handle)
.await;
runtime_handle.spawn(IntermediateNode::send_to_workers(
streaming_receiver,
worker_senders,
));
runtime_handle.spawn(
IntermediateNode::send_to_workers(streaming_receiver, worker_senders),
self.name(),
);
Ok(())
}

Expand Down
Loading

0 comments on commit 39cd0ad

Please sign in to comment.