From 52ea381104bbd92ea7676ef2c54cb1d86932d4d3 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Tue, 7 Jan 2025 16:57:04 +0100 Subject: [PATCH] perf(rust): Move morsel distribution to the computational async engine (#20600) --- crates/polars-stream/src/nodes/io_sources/parquet/init.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs index 7e1effd3431a..d2aa481d5006 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs @@ -159,8 +159,8 @@ impl ParquetSourceNode { })); // Distributes morsels across pipelines. This does not perform any CPU or I/O bound work - - // it is purely a dispatch loop. - let distribute_task = AbortOnDropHandle(io_runtime.spawn(async move { + // it is purely a dispatch loop. Run on the computational executor to reduce context switches. + let distribute_task = async_executor::spawn(TaskPriority::High, async move { let mut morsel_seq = MorselSeq::default(); while let Some(decode_fut) = decode_recv.recv().await { let df = decode_fut.await?; @@ -176,13 +176,13 @@ impl ParquetSourceNode { } } PolarsResult::Ok(()) - })); + }); let join_task = io_runtime.spawn(async move { metadata_task.await.unwrap()?; prefetch_task.await.unwrap()?; decode_task.await.unwrap()?; - distribute_task.await.unwrap()?; + distribute_task.await?; Ok(()) });