From e9c5088c60862368113fe117d4a1d47d20b7c4ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Wed, 14 Aug 2024 17:47:00 +0300 Subject: [PATCH] refactor(iroh-bytes)!: remove flume dependency (#2622) ## Description This is mostly a pretty mechanical replacement of flume with async_channel in the rebd blob store. One not so tiny change is that to support closing a read or write transaction after some delay, and because async_channel does not have recv_timeout, we switch the entire actor loop to run *async* but on a dedicated thread of the main runtime. That way we can use `tokio::select! {..., timeout}` ## Breaking Changes FlumeProgressSender was removed ## Notes & open questions ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --- Cargo.lock | 2 - iroh-blobs/Cargo.toml | 1 - iroh-blobs/src/store/fs.rs | 166 +++++++++++++----------- iroh-blobs/src/store/fs/test_support.rs | 61 +++++---- iroh-blobs/src/store/fs/util.rs | 90 +------------ iroh-blobs/src/util/progress.rs | 75 +---------- iroh/Cargo.toml | 1 - 7 files changed, 131 insertions(+), 265 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f229a55ce6..88509e7f8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2484,7 +2484,6 @@ dependencies = [ "clap", "console", "derive_more", - "flume", "futures-buffered", "futures-lite 2.3.0", "futures-util", @@ -2583,7 +2582,6 @@ dependencies = [ "bytes", "chrono", "derive_more", - "flume", "futures-buffered", "futures-lite 2.3.0", "futures-util", diff --git a/iroh-blobs/Cargo.toml b/iroh-blobs/Cargo.toml index 040da24aa2..aa6ecf67d4 100644 --- a/iroh-blobs/Cargo.toml +++ b/iroh-blobs/Cargo.toml @@ -22,7 +22,6 @@ bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-fe bytes = { version = "1.4", features = ["serde"] } chrono = "0.4.31" derive_more = { version = "=1.0.0-beta.7", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] } -flume = "0.11" futures-buffered = "0.2.4" futures-lite = "2.3" genawaiter = { version = "0.99.1", features = ["futures03"] } diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index e5201827b1..78ca6dd933 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -534,25 +534,25 @@ pub(crate) enum ActorMessage { /// Query method: get the rough entry status for a hash. Just complete, partial or not found. EntryStatus { hash: Hash, - tx: flume::Sender>, + tx: async_channel::Sender>, }, #[cfg(test)] /// Query method: get the full entry state for a hash, both in memory and in redb. /// This is everything we got about the entry, including the actual inline outboard and data. EntryState { hash: Hash, - tx: flume::Sender>, + tx: async_channel::Sender>, }, /// Query method: get the full entry state for a hash. GetFullEntryState { hash: Hash, - tx: flume::Sender>>, + tx: async_channel::Sender>>, }, /// Modification method: set the full entry state for a hash. SetFullEntryState { hash: Hash, entry: Option, - tx: flume::Sender>, + tx: async_channel::Sender>, }, /// Modification method: get or create a file handle for a hash. /// @@ -575,7 +575,7 @@ pub(crate) enum ActorMessage { /// At this point the size, hash and outboard must already be known. Import { cmd: Import, - tx: flume::Sender>, + tx: async_channel::Sender>, }, /// Modification method: export data from a redb store /// @@ -772,7 +772,7 @@ impl Store { #[derive(Debug)] struct StoreInner { - tx: flume::Sender, + tx: async_channel::Sender, temp: Arc>, handle: Option>, path_options: Arc, @@ -808,13 +808,15 @@ impl StoreInner { ); std::fs::create_dir_all(path.parent().unwrap())?; let temp: Arc> = Default::default(); - let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt)?; + let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?; let handle = std::thread::Builder::new() .name("redb-actor".to_string()) .spawn(move || { - if let Err(cause) = actor.run_batched() { - tracing::error!("redb actor failed: {}", cause); - } + rt.block_on(async move { + if let Err(cause) = actor.run_batched().await { + tracing::error!("redb actor failed: {}", cause); + } + }); }) .expect("failed to spawn thread"); Ok(Self { @@ -827,15 +829,13 @@ impl StoreInner { pub async fn get(&self, hash: Hash) -> OuterResult> { let (tx, rx) = oneshot::channel(); - self.tx.send_async(ActorMessage::Get { hash, tx }).await?; + self.tx.send(ActorMessage::Get { hash, tx }).await?; Ok(rx.await??) } async fn get_or_create(&self, hash: Hash) -> OuterResult { let (tx, rx) = oneshot::channel(); - self.tx - .send_async(ActorMessage::GetOrCreate { hash, tx }) - .await?; + self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?; Ok(rx.await??) } @@ -849,9 +849,7 @@ impl StoreInner { None } }); - self.tx - .send_async(ActorMessage::Blobs { filter, tx }) - .await?; + self.tx.send(ActorMessage::Blobs { filter, tx }).await?; let blobs = rx.await?; let res = blobs? .into_iter() @@ -873,9 +871,7 @@ impl StoreInner { None } }); - self.tx - .send_async(ActorMessage::Blobs { filter, tx }) - .await?; + self.tx.send(ActorMessage::Blobs { filter, tx }).await?; let blobs = rx.await?; let res = blobs? .into_iter() @@ -891,9 +887,7 @@ impl StoreInner { let (tx, rx) = oneshot::channel(); let filter: FilterPredicate = Box::new(|_i, k, v| Some((k.value(), v.value()))); - self.tx - .send_async(ActorMessage::Tags { filter, tx }) - .await?; + self.tx.send(ActorMessage::Tags { filter, tx }).await?; let tags = rx.await?; // transform the internal error type into io::Error let tags = tags? @@ -906,51 +900,47 @@ impl StoreInner { async fn set_tag(&self, tag: Tag, value: Option) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx - .send_async(ActorMessage::SetTag { tag, value, tx }) + .send(ActorMessage::SetTag { tag, value, tx }) .await?; Ok(rx.await??) } async fn create_tag(&self, hash: HashAndFormat) -> OuterResult { let (tx, rx) = oneshot::channel(); - self.tx - .send_async(ActorMessage::CreateTag { hash, tx }) - .await?; + self.tx.send(ActorMessage::CreateTag { hash, tx }).await?; Ok(rx.await??) } async fn delete(&self, hashes: Vec) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); - self.tx - .send_async(ActorMessage::Delete { hashes, tx }) - .await?; + self.tx.send(ActorMessage::Delete { hashes, tx }).await?; Ok(rx.await??) } async fn gc_start(&self) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); - self.tx.send_async(ActorMessage::GcStart { tx }).await?; + self.tx.send(ActorMessage::GcStart { tx }).await?; Ok(rx.await?) } async fn entry_status(&self, hash: &Hash) -> OuterResult { - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); self.tx - .send_async(ActorMessage::EntryStatus { hash: *hash, tx }) + .send(ActorMessage::EntryStatus { hash: *hash, tx }) .await?; - Ok(rx.into_recv_async().await??) + Ok(rx.recv().await??) } fn entry_status_sync(&self, hash: &Hash) -> OuterResult { - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); self.tx - .send(ActorMessage::EntryStatus { hash: *hash, tx })?; - Ok(rx.recv()??) + .send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?; + Ok(rx.recv_blocking()??) } async fn complete(&self, entry: Entry) -> OuterResult<()> { self.tx - .send_async(ActorMessage::OnComplete { handle: entry }) + .send(ActorMessage::OnComplete { handle: entry }) .await?; Ok(()) } @@ -985,7 +975,7 @@ impl StoreInner { let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); let (tx, rx) = oneshot::channel(); self.tx - .send_async(ActorMessage::Export { + .send(ActorMessage::Export { cmd: Export { temp_tag, target, @@ -1005,7 +995,7 @@ impl StoreInner { ) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx - .send_async(ActorMessage::Fsck { + .send(ActorMessage::Fsck { repair, progress, tx, @@ -1017,7 +1007,7 @@ impl StoreInner { async fn import_flat_store(&self, paths: FlatStorePaths) -> OuterResult { let (tx, rx) = oneshot::channel(); self.tx - .send_async(ActorMessage::ImportFlatStore { paths, tx }) + .send(ActorMessage::ImportFlatStore { paths, tx }) .await?; Ok(rx.await?) } @@ -1029,7 +1019,7 @@ impl StoreInner { ) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx - .send_async(ActorMessage::UpdateInlineOptions { + .send(ActorMessage::UpdateInlineOptions { inline_options, reapply, tx, @@ -1039,13 +1029,13 @@ impl StoreInner { } async fn dump(&self) -> OuterResult<()> { - self.tx.send_async(ActorMessage::Dump).await?; + self.tx.send(ActorMessage::Dump).await?; Ok(()) } async fn sync(&self) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); - self.tx.send_async(ActorMessage::Sync { tx }).await?; + self.tx.send(ActorMessage::Sync { tx }).await?; Ok(rx.await?) } @@ -1141,8 +1131,8 @@ impl StoreInner { let tag = self.temp.temp_tag(HashAndFormat { hash, format }); let hash = *tag.hash(); // blocking send for the import - let (tx, rx) = flume::bounded(1); - self.tx.send(ActorMessage::Import { + let (tx, rx) = async_channel::bounded(1); + self.tx.send_blocking(ActorMessage::Import { cmd: Import { content_id: HashAndFormat { hash, format }, source: file, @@ -1151,7 +1141,7 @@ impl StoreInner { }, tx, })?; - Ok(rx.recv()??) + Ok(rx.recv_blocking()??) } fn temp_file_name(&self) -> PathBuf { @@ -1161,7 +1151,7 @@ impl StoreInner { async fn shutdown(&self) { let (tx, rx) = oneshot::channel(); self.tx - .send_async(ActorMessage::Shutdown { tx: Some(tx) }) + .send(ActorMessage::Shutdown { tx: Some(tx) }) .await .ok(); rx.await.ok(); @@ -1171,7 +1161,9 @@ impl StoreInner { impl Drop for StoreInner { fn drop(&mut self) { if let Some(handle) = self.handle.take() { - self.tx.send(ActorMessage::Shutdown { tx: None }).ok(); + self.tx + .send_blocking(ActorMessage::Shutdown { tx: None }) + .ok(); handle.join().ok(); } } @@ -1181,7 +1173,7 @@ struct ActorState { handles: BTreeMap, protected: BTreeSet, temp: Arc>, - msgs: flume::Receiver, + msgs_rx: async_channel::Receiver, create_options: Arc, options: Options, rt: tokio::runtime::Handle, @@ -1243,13 +1235,13 @@ pub(crate) enum OuterError { #[error("inner error: {0}")] Inner(#[from] ActorError), #[error("send error: {0}")] - Send(#[from] flume::SendError), + Send(#[from] async_channel::SendError), #[error("progress send error: {0}")] ProgressSend(#[from] ProgressSendError), #[error("recv error: {0}")] Recv(#[from] oneshot::error::RecvError), #[error("recv error: {0}")] - FlumeRecv(#[from] flume::RecvError), + AsyncChannelRecv(#[from] async_channel::RecvError), #[error("join error: {0}")] JoinTask(#[from] tokio::task::JoinError), } @@ -1434,7 +1426,7 @@ impl Actor { options: Options, temp: Arc>, rt: tokio::runtime::Handle, - ) -> ActorResult<(Self, flume::Sender)> { + ) -> ActorResult<(Self, async_channel::Sender)> { let db = match redb::Database::create(path) { Ok(db) => db, Err(DatabaseError::UpgradeRequired(1)) => { @@ -1451,11 +1443,11 @@ impl Actor { txn.commit()?; // make the channel relatively large. there are some messages that don't // require a response, it's fine if they pile up a bit. - let (tx, rx) = flume::bounded(1024); + let (tx, rx) = async_channel::bounded(1024); let tx2 = tx.clone(); let on_file_create: CreateCb = Arc::new(move |hash| { // todo: make the callback allow async - tx2.send(ActorMessage::OnMemSizeExceeded { hash: *hash }) + tx2.send_blocking(ActorMessage::OnMemSizeExceeded { hash: *hash }) .ok(); Ok(()) }); @@ -1471,7 +1463,7 @@ impl Actor { temp, handles: BTreeMap::new(), protected: BTreeSet::new(), - msgs: rx, + msgs_rx: rx, options, create_options: Arc::new(create_options), rt, @@ -1481,9 +1473,9 @@ impl Actor { )) } - fn run_batched(mut self) -> ActorResult<()> { - let mut msgs = PeekableFlumeReceiver::new(self.state.msgs.clone()); - while let Some(msg) = msgs.recv() { + async fn run_batched(mut self) -> ActorResult<()> { + let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone()); + while let Some(msg) = msgs.recv().await { if let ActorMessage::Shutdown { tx } = msg { // Make sure the database is dropped before we send the reply. drop(self); @@ -1502,11 +1494,24 @@ impl Actor { let txn = self.db.begin_read()?; let tables = ReadOnlyTables::new(&txn)?; let count = self.state.options.batch.max_read_batch; - let timeout = self.state.options.batch.max_read_duration; - for msg in msgs.batch_iter(count, timeout) { - if let Err(msg) = self.state.handle_readonly(&tables, msg)? { - msgs.push_back(msg).expect("just recv'd"); - break; + let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration); + tokio::pin!(timeout); + for _ in 0..count { + tokio::select! { + msg = msgs.recv() => { + if let Some(msg) = msg { + if let Err(msg) = self.state.handle_readonly(&tables, msg)? { + msgs.push_back(msg).expect("just recv'd"); + break; + } + } else { + break; + } + } + _ = &mut timeout => { + tracing::debug!("read transaction timed out"); + break; + } } } tracing::debug!("done with read transaction"); @@ -1518,11 +1523,24 @@ impl Actor { let mut delete_after_commit = Default::default(); let mut tables = Tables::new(&txn, &mut delete_after_commit)?; let count = self.state.options.batch.max_write_batch; - let timeout = self.state.options.batch.max_write_duration; - for msg in msgs.batch_iter(count, timeout) { - if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? { - msgs.push_back(msg).expect("just recv'd"); - break; + let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration); + tokio::pin!(timeout); + for _ in 0..count { + tokio::select! { + msg = msgs.recv() => { + if let Some(msg) = msg { + if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? { + msgs.push_back(msg).expect("just recv'd"); + break; + } + } else { + break; + } + } + _ = &mut timeout => { + tracing::debug!("write transaction timed out"); + break; + } } } drop(tables); @@ -2217,7 +2235,7 @@ impl ActorState { } ActorMessage::EntryStatus { hash, tx } => { let res = self.entry_status(tables, hash); - tx.send(res).ok(); + tx.send_blocking(res).ok(); } ActorMessage::Blobs { filter, tx } => { let res = self.blobs(tables, filter); @@ -2237,11 +2255,11 @@ impl ActorState { } #[cfg(test)] ActorMessage::EntryState { hash, tx } => { - tx.send(self.entry_state(tables, hash)).ok(); + tx.send_blocking(self.entry_state(tables, hash)).ok(); } ActorMessage::GetFullEntryState { hash, tx } => { let res = self.get_full_entry_state(tables, hash); - tx.send(res).ok(); + tx.send_blocking(res).ok(); } x => return Ok(Err(x)), } @@ -2256,7 +2274,7 @@ impl ActorState { match msg { ActorMessage::Import { cmd, tx } => { let res = self.import(tables, cmd); - tx.send(res).ok(); + tx.send_blocking(res).ok(); } ActorMessage::SetTag { tag, value, tx } => { let res = self.set_tag(tables, tag, value); @@ -2287,7 +2305,7 @@ impl ActorState { } ActorMessage::SetFullEntryState { hash, entry, tx } => { let res = self.set_full_entry_state(tables, hash, entry); - tx.send(res).ok(); + tx.send_blocking(res).ok(); } msg => { // try to handle it as readonly diff --git a/iroh-blobs/src/store/fs/test_support.rs b/iroh-blobs/src/store/fs/test_support.rs index 10dd3530b4..733cba146f 100644 --- a/iroh-blobs/src/store/fs/test_support.rs +++ b/iroh-blobs/src/store/fs/test_support.rs @@ -106,36 +106,32 @@ impl Store { impl StoreInner { #[cfg(test)] async fn entry_state(&self, hash: Hash) -> OuterResult { - let (tx, rx) = flume::bounded(1); - self.tx - .send_async(ActorMessage::EntryState { hash, tx }) - .await?; - Ok(rx.recv_async().await??) + let (tx, rx) = async_channel::bounded(1); + self.tx.send(ActorMessage::EntryState { hash, tx }).await?; + Ok(rx.recv().await??) } async fn set_full_entry_state(&self, hash: Hash, entry: Option) -> OuterResult<()> { - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); self.tx - .send_async(ActorMessage::SetFullEntryState { hash, entry, tx }) + .send(ActorMessage::SetFullEntryState { hash, entry, tx }) .await?; - Ok(rx.recv_async().await??) + Ok(rx.recv().await??) } async fn get_full_entry_state(&self, hash: Hash) -> OuterResult> { - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); self.tx - .send_async(ActorMessage::GetFullEntryState { hash, tx }) + .send(ActorMessage::GetFullEntryState { hash, tx }) .await?; - Ok(rx.recv_async().await??) + Ok(rx.recv().await??) } async fn all_blobs(&self) -> OuterResult>> { let (tx, rx) = oneshot::channel(); let filter: FilterPredicate = Box::new(|_i, k, v| Some((k.value(), v.value()))); - self.tx - .send_async(ActorMessage::Blobs { filter, tx }) - .await?; + self.tx.send(ActorMessage::Blobs { filter, tx }).await?; let blobs = rx.await?; let res = blobs? .into_iter() @@ -364,14 +360,19 @@ pub fn make_partial( path: &Path, f: impl Fn(Hash, u64) -> MakePartialResult + Send + Sync, ) -> io::Result<()> { - tokio::runtime::Builder::new_current_thread() - .build()? - .block_on(async move { - let blobs_path = path.join("blobs"); - let store = Store::load(blobs_path).await?; - store - .transform_entries(|hash, entry| match &entry { - EntryData::Complete { data, outboard } => match f(hash, data.len() as u64) { + tracing::info!("starting runtime for make_partial"); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(async move { + let blobs_path = path.join("blobs"); + let store = Store::load(blobs_path).await?; + store + .transform_entries(|hash, entry| match &entry { + EntryData::Complete { data, outboard } => { + let res = f(hash, data.len() as u64); + tracing::info!("make_partial: {} {:?}", hash, res); + match res { MakePartialResult::Retain => Some(entry), MakePartialResult::Remove => None, MakePartialResult::Truncate(size) => { @@ -388,10 +389,14 @@ pub fn make_partial( Some(entry) } } - }, - EntryData::Partial { .. } => Some(entry), - }) - .await?; - Ok(()) - }) + } + } + EntryData::Partial { .. } => Some(entry), + }) + .await?; + std::io::Result::Ok(()) + })?; + drop(rt); + tracing::info!("done with make_partial"); + Ok(()) } diff --git a/iroh-blobs/src/store/fs/util.rs b/iroh-blobs/src/store/fs/util.rs index c01515b09d..b747d9d7b5 100644 --- a/iroh-blobs/src/store/fs/util.rs +++ b/iroh-blobs/src/store/fs/util.rs @@ -2,7 +2,6 @@ use std::{ fs::OpenOptions, io::{self, Write}, path::Path, - time::{Duration, Instant}, }; /// overwrite a file with the given data. @@ -47,70 +46,24 @@ pub fn read_and_remove(path: &Path) -> io::Result> { #[derive(Debug)] pub(super) struct PeekableFlumeReceiver { msg: Option, - recv: flume::Receiver, + recv: async_channel::Receiver, } #[allow(dead_code)] impl PeekableFlumeReceiver { - pub fn new(recv: flume::Receiver) -> Self { + pub fn new(recv: async_channel::Receiver) -> Self { Self { msg: None, recv } } - /// Peek at the next message. - /// - /// Will block if there are no messages. - /// Returns None only if there are no more messages (sender is dropped). - pub fn peek(&mut self) -> Option<&T> { - if self.msg.is_none() { - self.msg = self.recv.recv().ok(); - } - self.msg.as_ref() - } - /// Receive the next message. /// /// Will block if there are no messages. /// Returns None only if there are no more messages (sender is dropped). - pub fn recv(&mut self) -> Option { + pub async fn recv(&mut self) -> Option { if let Some(msg) = self.msg.take() { return Some(msg); } - self.recv.recv().ok() - } - - /// Try to peek at the next message. - /// - /// Will not block. - /// Returns None if reading would block, or if there are no more messages (sender is dropped). - pub fn try_peek(&mut self) -> Option<&T> { - if self.msg.is_none() { - self.msg = self.recv.try_recv().ok(); - } - self.msg.as_ref() - } - - /// Try to receive the next message. - /// - /// Will not block. - /// Returns None if reading would block, or if there are no more messages (sender is dropped). - pub fn try_recv(&mut self) -> Option { - if let Some(msg) = self.msg.take() { - return Some(msg); - } - self.recv.try_recv().ok() - } - - pub fn recv_timeout(&mut self, timeout: std::time::Duration) -> Option { - if let Some(msg) = self.msg.take() { - return Some(msg); - } - self.recv.recv_timeout(timeout).ok() - } - - /// Create an iterator that pulls messages from the receiver for at most - /// `count` messages or `max_duration` time. - pub fn batch_iter(&mut self, count: usize, max_duration: Duration) -> BatchIter { - BatchIter::new(self, count, max_duration) + self.recv.recv().await.ok() } /// Push back a message. This will only work if there is room for it. @@ -124,38 +77,3 @@ impl PeekableFlumeReceiver { } } } - -pub(super) struct BatchIter<'a, T> { - recv: &'a mut PeekableFlumeReceiver, - start: Instant, - remaining: usize, - max_duration: Duration, -} - -impl<'a, T> BatchIter<'a, T> { - fn new(recv: &'a mut PeekableFlumeReceiver, count: usize, max_duration: Duration) -> Self { - Self { - recv, - start: Instant::now(), - remaining: count, - max_duration, - } - } -} - -impl<'a, T> Iterator for BatchIter<'a, T> { - type Item = T; - - fn next(&mut self) -> Option { - if self.remaining == 0 { - return None; - } - let elapsed = self.start.elapsed(); - if elapsed >= self.max_duration { - return None; - } - let remaining_time = self.max_duration - elapsed; - self.remaining -= 1; - self.recv.recv_timeout(remaining_time) - } -} diff --git a/iroh-blobs/src/util/progress.rs b/iroh-blobs/src/util/progress.rs index 6f1f678655..a84d46bc74 100644 --- a/iroh-blobs/src/util/progress.rs +++ b/iroh-blobs/src/util/progress.rs @@ -70,9 +70,9 @@ use iroh_io::AsyncSliceWriter; /// /// If you don't want to report progress, you can use the [IgnoreProgressSender] type. /// -/// # Flume progress sender +/// # Async channel progress sender /// -/// If you want to use a flume channel, you can use the [FlumeProgressSender] type. +/// If you want to use an async channel, you can use the [AsyncChannelProgressSender] type. /// /// # Implementing your own progress sender /// @@ -447,77 +447,6 @@ impl< } } -/// A progress sender that uses a flume channel. -pub struct FlumeProgressSender { - sender: flume::Sender, - id: std::sync::Arc, -} - -impl std::fmt::Debug for FlumeProgressSender { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FlumeProgressSender") - .field("id", &self.id) - .field("sender", &self.sender) - .finish() - } -} - -impl Clone for FlumeProgressSender { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - id: self.id.clone(), - } - } -} - -impl FlumeProgressSender { - /// Create a new progress sender from a flume sender. - pub fn new(sender: flume::Sender) -> Self { - Self { - sender, - id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)), - } - } - - /// Returns true if `other` sends on the same `flume` channel as `self`. - pub fn same_channel(&self, other: &FlumeProgressSender) -> bool { - self.sender.same_channel(&other.sender) - } -} - -impl IdGenerator for FlumeProgressSender { - fn new_id(&self) -> u64 { - self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst) - } -} - -impl ProgressSender for FlumeProgressSender { - type Msg = T; - - async fn send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> { - self.sender - .send_async(msg) - .await - .map_err(|_| ProgressSendError::ReceiverDropped) - } - - fn try_send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> { - match self.sender.try_send(msg) { - Ok(_) => Ok(()), - Err(flume::TrySendError::Full(_)) => Ok(()), - Err(flume::TrySendError::Disconnected(_)) => Err(ProgressSendError::ReceiverDropped), - } - } - - fn blocking_send(&self, msg: Self::Msg) -> std::result::Result<(), ProgressSendError> { - match self.sender.send(msg) { - Ok(_) => Ok(()), - Err(_) => Err(ProgressSendError::ReceiverDropped), - } - } -} - /// A progress sender that uses an async channel. pub struct AsyncChannelProgressSender { sender: async_channel::Sender, diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 3a4264a54a..5d11db97cf 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -21,7 +21,6 @@ async-channel = "2.3.1" bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false } bytes = "1" derive_more = { version = "=1.0.0-beta.7", features = ["debug", "display", "from", "try_into", "from_str"] } -flume = "0.11" futures-buffered = "0.2.4" futures-lite = "2.3" futures-util = "0.3"