From 9a22de7e1fb23ce0127fa51f0d98aae73314b1f4 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 6 Feb 2023 17:29:07 +0100 Subject: [PATCH] Make Provider a Future, remove Provider::join (#120) This removes the consuming Provider::join method and makes Provider impl a Future directly. This mirrors how a JoinHandle for a tokio task behaves, which is what this also kind off is. Primarily this allowes for the await-completion operation to be cancelable and re-entrant. This is needed to be able to await completion of the task in a loop, repeatedly calling tokio::select! with it. --- src/lib.rs | 2 +- src/main.rs | 2 +- src/provider.rs | 27 +++++++++++++++++---------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1c035bc..4bd8c6b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -246,7 +246,7 @@ mod tests { .await?; provider.abort(); - let _ = provider.join().await; + provider.await.ok(); // .abort() makes this a Result::Err let events = events_task.await.unwrap(); assert_eq!(events.len(), 3); diff --git a/src/main.rs b/src/main.rs index 6fdc9e8..807055c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -204,7 +204,7 @@ async fn main() -> Result<()> { out_writer .println(format!("All-in-one ticket: {}", provider.ticket(hash))) .await; - provider.join().await?; + provider.await?; // Drop tempath to signal it can be destroyed drop(tmp_path); diff --git a/src/provider.rs b/src/provider.rs index c9e3468..93ac4b0 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -8,10 +8,13 @@ //! //! To shut down the provider, call [Provider::abort]. use std::fmt::{self, Display}; +use std::future::Future; use std::io::{BufReader, Read}; use std::net::SocketAddr; use std::path::PathBuf; +use std::pin::Pin; use std::str::FromStr; +use std::task::Poll; use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, bail, ensure, Context, Result}; @@ -49,9 +52,9 @@ impl Database { /// You must supply a database which can be created using [`create_collection`], everything else is /// optional. Finally you can create and run the provider by calling [`Builder::spawn`]. /// -/// The returned [`Provider`] provides [`Provider::join`] to wait for the spawned task. -/// Currently it needs to be aborted using [`Provider::abort`], graceful shutdown will be -/// implemented in the immediate future. +/// The returned [`Provider`] is awaitable to know when it finishes. Currently it needs to +/// be aborted using [`Provider::abort`], graceful shutdown will be implemented in the +/// future. #[derive(Debug)] pub struct Builder { bind_addr: SocketAddr, @@ -172,7 +175,8 @@ impl Builder { /// The only way to create this is by using the [`Builder::spawn`]. [`Provider::builder`] /// is a shorthand to create a suitable [`Builder`]. /// -/// This runs a tokio task which can be aborted and joined if desired. +/// This runs a tokio task which can be aborted and joined if desired. To join the task +/// await the [`Provider`] struct directly, it will complete when the task completes. #[derive(Debug)] pub struct Provider { listen_addr: SocketAddr, @@ -257,12 +261,6 @@ impl Provider { } } - /// Blocks until the provider task completes. - // TODO: Maybe implement Future directly? - pub async fn join(self) -> Result<(), JoinError> { - self.task.await - } - /// Aborts the provider. /// /// TODO: temporary, do graceful shutdown instead. @@ -271,6 +269,15 @@ impl Provider { } } +/// The future completes when the spawned tokio task finishes. +impl Future for Provider { + type Output = Result<(), JoinError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + Pin::new(&mut self.task).poll(cx) + } +} + async fn handle_stream( db: Database, token: AuthToken,