Skip to content

Commit

Permalink
[CHORE] connect: Optimize plans in connect (#3378)
Browse files Browse the repository at this point in the history
1. Optimize logical plans before executing them in connect.
2. Run the native executor on a spawned task on the connect runtime
instead of a dedicated thread. The native executor already spawns it's
own thread, and it also returns an awaitable receiver, so we should be
able to just receive from the connect runtime.

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Nov 21, 2024
1 parent d9d916b commit f6eb993
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 116 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ arrow2 = {workspace = true}
async-stream = "0.3.6"
common-daft-config = {workspace = true}
daft-local-execution = {workspace = true}
daft-local-plan = {workspace = true}
daft-logical-plan = {workspace = true}
daft-scan = {workspace = true}
daft-table = {workspace = true}
Expand All @@ -13,13 +12,12 @@ futures = "0.3.31"
pyo3 = {workspace = true, optional = true}
spark-connect = {workspace = true}
tokio = {version = "1.40.0", features = ["full"]}
tokio-util = {workspace = true}
tonic = "0.12.3"
tracing = {workspace = true}
uuid = {version = "1.10.0", features = ["v4"]}

[features]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-local-plan/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python"]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python"]

[lints]
workspace = true
Expand Down
36 changes: 16 additions & 20 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{collections::HashMap, future::ready};

use common_daft_config::DaftExecutionConfig;
use daft_local_execution::NativeExecutor;
use futures::stream;
use spark_connect::{ExecutePlanResponse, Relation};
use tokio_util::sync::CancellationToken;
use tonic::{codegen::tokio_stream::wrappers::ReceiverStream, Status};

use crate::{
Expand All @@ -28,36 +28,32 @@ impl Session {

let finished = context.finished();

let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(16);
std::thread::spawn(move || {
let result = (|| -> eyre::Result<()> {
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);
tokio::spawn(async move {
let execution_fut = async {
let plan = translation::to_logical_plan(command)?;
let logical_plan = plan.build();
let physical_plan = daft_local_plan::translate(&logical_plan)?;

let optimized_plan = plan.optimize()?;
let cfg = DaftExecutionConfig::default();
let results = daft_local_execution::run_local(
&physical_plan,
HashMap::new(),
cfg.into(),
None,
CancellationToken::new(), // todo: maybe implement cancelling
)?;
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(HashMap::new(), cfg.into(), None)?
.into_stream();

for result in results {
while let Some(result) = result_stream.next().await {
let result = result?;
let tables = result.get_tables()?;

for table in tables.as_slice() {
let response = context.gen_response(table)?;
tx.blocking_send(Ok(response)).unwrap();
if tx.send(Ok(response)).await.is_err() {
return Ok(());
}
}
}
Ok(())
})();
};

if let Err(e) = result {
tx.blocking_send(Err(e)).unwrap();
if let Err(e) = execution_fut.await {
let _ = tx.send(Err(e)).await;
}
});

Expand Down
4 changes: 4 additions & 0 deletions src/daft-local-execution/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ impl<T> Receiver<T> {
pub(crate) fn blocking_recv(&self) -> Option<T> {
self.0.recv().ok()
}

pub(crate) fn into_inner(self) -> loole::Receiver<T> {
self.0
}
}

pub(crate) fn create_channel<T: Clone>(buffer_size: usize) -> (Sender<T>, Receiver<T>) {
Expand Down
6 changes: 4 additions & 2 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
use common_error::{DaftError, DaftResult};
use common_runtime::RuntimeTask;
use lazy_static::lazy_static;
pub use run::{run_local, NativeExecutor};
pub use run::{run_local, ExecutionEngineResult, NativeExecutor};
use snafu::{futures::TryFutureExt, ResultExt, Snafu};

lazy_static! {
Expand Down Expand Up @@ -200,6 +200,8 @@ type Result<T, E = Error> = std::result::Result<T, E>;

#[cfg(feature = "python")]
pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {
parent.add_class::<NativeExecutor>()?;
use run::PyNativeExecutor;

parent.add_class::<PyNativeExecutor>()?;
Ok(())
}
205 changes: 149 additions & 56 deletions src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use common_tracing::refresh_chrome_trace;
use daft_local_plan::{translate, LocalPhysicalPlan};
use daft_logical_plan::LogicalPlanBuilder;
use daft_micropartition::MicroPartition;
use futures::{FutureExt, Stream};
use loole::RecvFuture;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "python")]
use {
Expand Down Expand Up @@ -44,32 +47,25 @@ impl LocalPartitionIterator {
}
}

#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))]
pub struct NativeExecutor {
local_physical_plan: Arc<LocalPhysicalPlan>,
cancel: CancellationToken,
}

impl Drop for NativeExecutor {
fn drop(&mut self) {
self.cancel.cancel();
}
#[cfg_attr(
feature = "python",
pyclass(module = "daft.daft", name = "NativeExecutor")
)]
pub struct PyNativeExecutor {
executor: NativeExecutor,
}

#[cfg(feature = "python")]
#[pymethods]
impl NativeExecutor {
impl PyNativeExecutor {
#[staticmethod]
pub fn from_logical_plan_builder(
logical_plan_builder: &PyLogicalPlanBuilder,
py: Python,
) -> PyResult<Self> {
py.allow_threads(|| {
let logical_plan = logical_plan_builder.builder.build();
let local_physical_plan = translate(&logical_plan)?;
Ok(Self {
local_physical_plan,
cancel: CancellationToken::new(),
executor: NativeExecutor::from_logical_plan_builder(&logical_plan_builder.builder)?,
})
})
}
Expand All @@ -94,13 +90,9 @@ impl NativeExecutor {
})
.collect();
let out = py.allow_threads(|| {
run_local(
&self.local_physical_plan,
native_psets,
cfg.config,
results_buffer_size,
self.cancel.clone(),
)
self.executor
.run(native_psets, cfg.config, results_buffer_size)
.map(|res| res.into_iter())
})?;
let iter = Box::new(out.map(|part| {
part.map(|p| pyo3::Python::with_gil(|py| PyMicroPartition::from(p).into_py(py)))
Expand All @@ -110,6 +102,45 @@ impl NativeExecutor {
}
}

pub struct NativeExecutor {
local_physical_plan: Arc<LocalPhysicalPlan>,
cancel: CancellationToken,
}

impl NativeExecutor {
pub fn from_logical_plan_builder(
logical_plan_builder: &LogicalPlanBuilder,
) -> DaftResult<Self> {
let logical_plan = logical_plan_builder.build();
let local_physical_plan = translate(&logical_plan)?;
Ok(Self {
local_physical_plan,
cancel: CancellationToken::new(),
})
}

pub fn run(
&self,
psets: HashMap<String, Vec<Arc<MicroPartition>>>,
cfg: Arc<DaftExecutionConfig>,
results_buffer_size: Option<usize>,
) -> DaftResult<ExecutionEngineResult> {
run_local(
&self.local_physical_plan,
psets,
cfg,
results_buffer_size,
self.cancel.clone(),
)
}
}

impl Drop for NativeExecutor {
fn drop(&mut self) {
self.cancel.cancel();
}
}

fn should_enable_explain_analyze() -> bool {
let explain_var_name = "DAFT_DEV_ENABLE_EXPLAIN_ANALYZE";
if let Ok(val) = std::env::var(explain_var_name)
Expand All @@ -121,13 +152,105 @@ fn should_enable_explain_analyze() -> bool {
}
}

pub struct ExecutionEngineReceiverIterator {
receiver: Receiver<Arc<MicroPartition>>,
handle: Option<std::thread::JoinHandle<DaftResult<()>>>,
}

impl Iterator for ExecutionEngineReceiverIterator {
type Item = DaftResult<Arc<MicroPartition>>;

fn next(&mut self) -> Option<Self::Item> {
match self.receiver.blocking_recv() {
Some(part) => Some(Ok(part)),
None => {
if self.handle.is_some() {
let join_result = self
.handle
.take()
.unwrap()
.join()
.expect("Execution engine thread panicked");
match join_result {
Ok(()) => None,
Err(e) => Some(Err(e)),
}
} else {
None
}
}
}
}
}

pub struct ExecutionEngineReceiverStream {
receive_fut: RecvFuture<Arc<MicroPartition>>,
handle: Option<std::thread::JoinHandle<DaftResult<()>>>,
}

impl Stream for ExecutionEngineReceiverStream {
type Item = DaftResult<Arc<MicroPartition>>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.receive_fut.poll_unpin(cx) {
std::task::Poll::Ready(Ok(part)) => std::task::Poll::Ready(Some(Ok(part))),
std::task::Poll::Ready(Err(_)) => {
if self.handle.is_some() {
let join_result = self
.handle
.take()
.unwrap()
.join()
.expect("Execution engine thread panicked");
match join_result {
Ok(()) => std::task::Poll::Ready(None),
Err(e) => std::task::Poll::Ready(Some(Err(e))),
}
} else {
std::task::Poll::Ready(None)
}
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}

pub struct ExecutionEngineResult {
handle: std::thread::JoinHandle<DaftResult<()>>,
receiver: Receiver<Arc<MicroPartition>>,
}

impl ExecutionEngineResult {
pub fn into_stream(self) -> impl Stream<Item = DaftResult<Arc<MicroPartition>>> {
ExecutionEngineReceiverStream {
receive_fut: self.receiver.into_inner().recv_async(),
handle: Some(self.handle),
}
}
}

impl IntoIterator for ExecutionEngineResult {
type Item = DaftResult<Arc<MicroPartition>>;
type IntoIter = ExecutionEngineReceiverIterator;

fn into_iter(self) -> Self::IntoIter {
ExecutionEngineReceiverIterator {
receiver: self.receiver,
handle: Some(self.handle),
}
}
}

pub fn run_local(
physical_plan: &LocalPhysicalPlan,
psets: HashMap<String, Vec<Arc<MicroPartition>>>,
cfg: Arc<DaftExecutionConfig>,
results_buffer_size: Option<usize>,
cancel: CancellationToken,
) -> DaftResult<Box<dyn Iterator<Item = DaftResult<Arc<MicroPartition>>> + Send>> {
) -> DaftResult<ExecutionEngineResult> {
refresh_chrome_trace();
let pipeline = physical_plan_to_pipeline(physical_plan, &psets, &cfg)?;
let (tx, rx) = create_channel(results_buffer_size.unwrap_or(1));
Expand Down Expand Up @@ -188,38 +311,8 @@ pub fn run_local(
})
});

struct ReceiverIterator {
receiver: Receiver<Arc<MicroPartition>>,
handle: Option<std::thread::JoinHandle<DaftResult<()>>>,
}

impl Iterator for ReceiverIterator {
type Item = DaftResult<Arc<MicroPartition>>;

fn next(&mut self) -> Option<Self::Item> {
match self.receiver.blocking_recv() {
Some(part) => Some(Ok(part)),
None => {
if self.handle.is_some() {
let join_result = self
.handle
.take()
.unwrap()
.join()
.expect("Execution engine thread panicked");
match join_result {
Ok(()) => None,
Err(e) => Some(Err(e)),
}
} else {
None
}
}
}
}
}
Ok(Box::new(ReceiverIterator {
Ok(ExecutionEngineResult {
handle,
receiver: rx,
handle: Some(handle),
}))
})
}
Loading

0 comments on commit f6eb993

Please sign in to comment.