From e751f610da787850e66a60e7810e3e56f8bde4c8 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 14 Nov 2024 16:22:04 +0800 Subject: [PATCH] pin project --- Cargo.lock | 1 + src/common/runtime/src/lib.rs | 2 +- src/daft-local-execution/Cargo.toml | 1 + src/daft-local-execution/src/lib.rs | 17 ++++++++--------- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6af411984..c9e3f852aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2214,6 +2214,7 @@ dependencies = [ "log", "loole", "num-format", + "pin-project", "pyo3", "snafu", "tokio", diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index 41803f06d7..54e2555e61 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -58,7 +58,7 @@ impl Future for RuntimeTask { type Output = DaftResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.joinset).poll_join_next(cx) { + match self.joinset.poll_join_next(cx) { Poll::Ready(Some(result)) => { Poll::Ready(result.map_err(|e| DaftError::External(e.into()))) } diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 60ae89e875..06ddd3efee 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -27,6 +27,7 @@ lazy_static = {workspace = true} log = {workspace = true} loole = "0.4.0" num-format = "0.4.4" +pin-project = "1" pyo3 = {workspace = true, optional = true} snafu = {workspace = true} tokio = {workspace = true} diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index b3db9809c4..bda9bfcd09 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -29,23 +29,22 @@ lazy_static! { /// It can be either `Ready` or `Pending`. /// If the output is `Ready`, the value is immediately available. /// If the output is `Pending`, the value is not yet available and a `RuntimeTask` is returned. +#[pin_project::pin_project(project = OperatorOutputProj)] pub(crate) enum OperatorOutput { Ready(Option), - Pending(RuntimeTask), + Pending(#[pin] RuntimeTask), } impl Future for OperatorOutput { type Output = DaftResult; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - match this { - Self::Ready(value) => { + match self.project() { + OperatorOutputProj::Ready(value) => { let value = value.take().unwrap(); Poll::Ready(Ok(value)) } - Self::Pending(task) => Pin::new(task).poll(cx), + OperatorOutputProj::Pending(task) => task.poll(cx), } } } @@ -89,13 +88,13 @@ impl TaskSet { } } -struct SpawnedTask(tokio::task::JoinHandle); +#[pin_project::pin_project] +struct SpawnedTask(#[pin] tokio::task::JoinHandle); impl Future for SpawnedTask { type Output = crate::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - Pin::new(&mut this.0).poll(cx).map(|r| r.context(JoinSnafu)) + self.project().0.poll(cx).map(|r| r.context(JoinSnafu)) } }