Skip to content

Commit

Permalink
chore: replace CancellationToken with AbortableJoinHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 4, 2024
1 parent f7992d7 commit df3edf9
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 62 deletions.
22 changes: 5 additions & 17 deletions extensions/warp-ipfs/src/store/event_subscription.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::rt::{Executor, LocalExecutor};
use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor};
use futures::{
channel::{
mpsc::{channel, Receiver, Sender},
Expand All @@ -7,11 +7,9 @@ use futures::{
stream::BoxStream,
SinkExt, StreamExt,
};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::task::{Poll, Waker};
use std::{collections::VecDeque, sync::Arc};
use tokio::select;
use tokio_util::sync::{CancellationToken, DropGuard};
use warp::error::Error;

#[allow(clippy::large_enum_variant)]
Expand All @@ -27,7 +25,7 @@ enum Command<T: Clone + Debug + Send + 'static> {
#[derive(Clone, Debug)]
pub struct EventSubscription<T: Clone + Debug + Send + 'static> {
tx: Sender<Command<T>>,
_task_cancellation: Arc<DropGuard>,
_handle: AbortableJoinHandle<()>,
}

impl<T: Clone + Debug + Send + 'static> EventSubscription<T> {
Expand All @@ -43,19 +41,9 @@ impl<T: Clone + Debug + Send + 'static> EventSubscription<T> {
rx,
};

let token = CancellationToken::new();
let drop_guard = token.clone().drop_guard();
executor.dispatch(async move {
select! {
_ = token.cancelled() => {}
_ = task.run() => {}
}
});
let _handle = executor.spawn_abortable(async move { task.run().await });

Self {
tx,
_task_cancellation: Arc::new(drop_guard),
}
Self { tx, _handle }
}

pub async fn subscribe<'a>(&self) -> Result<BoxStream<'a, T>, Error> {
Expand Down
18 changes: 5 additions & 13 deletions extensions/warp-ipfs/src/store/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures_finally::try_stream::FinallyTryStreamExt;

use rust_ipfs::{unixfs::UnixfsStatus, Ipfs, IpfsPath};

use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{Instrument, Span};
use warp::{
constellation::{
Expand All @@ -31,7 +30,7 @@ use super::{
document::root::RootDocumentMap, event_subscription::EventSubscription,
MAX_THUMBNAIL_STREAM_SIZE,
};
use crate::rt::{Executor, LocalExecutor};
use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor};
use crate::{
config::{self, Config},
thumbnail::ThumbnailGenerator,
Expand All @@ -44,7 +43,7 @@ pub struct FileStore {
path: Arc<RwLock<PathBuf>>,
config: config::Config,
command_sender: mpsc::Sender<FileTaskCommand>,
_guard: Arc<DropGuard>,
_handle: AbortableJoinHandle<()>,
}

impl FileStore {
Expand All @@ -55,6 +54,7 @@ impl FileStore {
constellation_tx: EventSubscription<ConstellationEventKind>,
span: &Span,
) -> Self {
let executor = LocalExecutor;
let config = config.clone();

let index = Directory::new("root");
Expand Down Expand Up @@ -92,24 +92,16 @@ impl FileStore {
let signal = Some(task.signal_tx.clone());
index.rebuild_paths(&signal);

let token = CancellationToken::new();
let _guard = Arc::new(token.clone().drop_guard());

let span = span.clone();

LocalExecutor.dispatch(async move {
tokio::select! {
_ = task.run().instrument(span) => {}
_ = token.cancelled() => {}
}
});
let _handle = executor.spawn_abortable(async move { task.run().await }.instrument(span));

FileStore {
index,
config,
path,
command_sender,
_guard,
_handle,
}
}
}
Expand Down
20 changes: 3 additions & 17 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use ipld_core::cid::Cid;
use rust_ipfs::{Ipfs, PeerId};

use serde::{Deserialize, Serialize};
use tokio_util::sync::{CancellationToken, DropGuard};
use uuid::Uuid;

use super::{document::root::RootDocumentMap, ds_key::DataStoreKey, PeerIdExt};
Expand Down Expand Up @@ -79,7 +78,7 @@ pub type DownloadStream = BoxStream<'static, Result<Bytes, std::io::Error>>;
#[derive(Clone)]
pub struct MessageStore {
inner: Arc<tokio::sync::RwLock<ConversationInner>>,
_task_cancellation: Arc<DropGuard>,
_handle: AbortableJoinHandle<()>,
}

impl MessageStore {
Expand All @@ -94,9 +93,6 @@ impl MessageStore {
let executor = LocalExecutor;
tracing::info!("Initializing MessageStore");

let token = CancellationToken::new();
let drop_guard = token.clone().drop_guard();

let root = identity.root_document().clone();

let mut inner = ConversationInner {
Expand Down Expand Up @@ -127,19 +123,9 @@ impl MessageStore {
identity: identity.clone(),
};

executor.dispatch({
async move {
tokio::select! {
_ = token.cancelled() => {}
_ = task.run() => {}
}
}
});
let _handle = executor.spawn_abortable(task.run());

Self {
inner,
_task_cancellation: Arc::new(drop_guard),
}
Self { inner, _handle }
}
}

Expand Down
19 changes: 4 additions & 15 deletions extensions/warp-ipfs/src/store/queue.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use futures::{channel::mpsc, StreamExt, TryFutureExt};

use crate::rt::{Executor, LocalExecutor};
use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor};
use crate::store::{
ds_key::DataStoreKey, ecdh_encrypt, payload::PayloadBuilder, topics::PeerTopic, PeerIdExt,
};
use ipld_core::cid::Cid;
use rust_ipfs::{Ipfs, Keypair};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tokio_util::sync::{CancellationToken, DropGuard};
use warp::{crypto::DID, error::Error};
use web_time::Instant;

Expand Down Expand Up @@ -228,7 +227,7 @@ pub struct QueueEntry {
recipient: DID,
keypair: Keypair,
item: RequestResponsePayload,
drop_guard: Arc<RwLock<Option<DropGuard>>>,
drop_guard: Arc<RwLock<Option<AbortableJoinHandle<()>>>>,
executor: LocalExecutor,
}

Expand All @@ -249,9 +248,6 @@ impl QueueEntry {
executor: LocalExecutor,
};

let token = CancellationToken::new();
let drop_guard = token.clone().drop_guard();

let fut = {
let entry = entry.clone();
async move {
Expand Down Expand Up @@ -332,16 +328,9 @@ impl QueueEntry {
}
};

entry.executor.dispatch(async move {
futures::pin_mut!(fut);

tokio::select! {
_ = token.cancelled() => {}
_ = &mut fut => {}
}
});
let _handle = entry.executor.spawn_abortable(fut);

*entry.drop_guard.write().await = Some(drop_guard);
*entry.drop_guard.write().await = Some(_handle);

entry
}
Expand Down

0 comments on commit df3edf9

Please sign in to comment.