Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Make Provider a Future, remove Provider::join (#120)
Browse files Browse the repository at this point in the history
This removes the consuming Provider::join method and makes Provider impl
a
Future directly.  This mirrors how a JoinHandle for a tokio task
behaves, which is what this also kind off is.

Primarily this allowes for the await-completion operation to be
cancelable and re-entrant.  This is needed to be able to await
completion of the task in a loop, repeatedly calling tokio::select!
with it.
  • Loading branch information
flub authored Feb 6, 2023
1 parent 321ed44 commit 9a22de7
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
.await?;

provider.abort();
let _ = provider.join().await;
provider.await.ok(); // .abort() makes this a Result::Err

let events = events_task.await.unwrap();
assert_eq!(events.len(), 3);
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ async fn main() -> Result<()> {
out_writer
.println(format!("All-in-one ticket: {}", provider.ticket(hash)))
.await;
provider.join().await?;
provider.await?;

// Drop tempath to signal it can be destroyed
drop(tmp_path);
Expand Down
27 changes: 17 additions & 10 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
//!
//! To shut down the provider, call [Provider::abort].
use std::fmt::{self, Display};
use std::future::Future;
use std::io::{BufReader, Read};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::task::Poll;
use std::{collections::HashMap, sync::Arc};

use anyhow::{anyhow, bail, ensure, Context, Result};
Expand Down Expand Up @@ -49,9 +52,9 @@ impl Database {
/// You must supply a database which can be created using [`create_collection`], everything else is
/// optional. Finally you can create and run the provider by calling [`Builder::spawn`].
///
/// The returned [`Provider`] provides [`Provider::join`] to wait for the spawned task.
/// Currently it needs to be aborted using [`Provider::abort`], graceful shutdown will be
/// implemented in the immediate future.
/// The returned [`Provider`] is awaitable to know when it finishes. Currently it needs to
/// be aborted using [`Provider::abort`], graceful shutdown will be implemented in the
/// future.
#[derive(Debug)]
pub struct Builder {
bind_addr: SocketAddr,
Expand Down Expand Up @@ -172,7 +175,8 @@ impl Builder {
/// The only way to create this is by using the [`Builder::spawn`]. [`Provider::builder`]
/// is a shorthand to create a suitable [`Builder`].
///
/// This runs a tokio task which can be aborted and joined if desired.
/// This runs a tokio task which can be aborted and joined if desired. To join the task
/// await the [`Provider`] struct directly, it will complete when the task completes.
#[derive(Debug)]
pub struct Provider {
listen_addr: SocketAddr,
Expand Down Expand Up @@ -257,12 +261,6 @@ impl Provider {
}
}

/// Blocks until the provider task completes.
// TODO: Maybe implement Future directly?
pub async fn join(self) -> Result<(), JoinError> {
self.task.await
}

/// Aborts the provider.
///
/// TODO: temporary, do graceful shutdown instead.
Expand All @@ -271,6 +269,15 @@ impl Provider {
}
}

/// The future completes when the spawned tokio task finishes.
impl Future for Provider {
type Output = Result<(), JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.task).poll(cx)
}
}

async fn handle_stream(
db: Database,
token: AuthToken,
Expand Down

0 comments on commit 9a22de7

Please sign in to comment.