Skip to content

Commit

Permalink
refactor!: avoid using futures crate directly (#2117)
Browse files Browse the repository at this point in the history
Due to the low maintenance and absurd high amount of `unsafe` code in
parts of the `futures` crate, we are trying to avoid usage of it.

Usages are replaced with

- `futures-lite` : general `Future` and `Stream` tooling
- `futures-sink`: `Sink` trait
- `futures-buffered`: faster and safer version of `Futures{Un}Ordered`
- If must be `futures-util` for sink specific things that are missing

## Breaking Changes

- `iroh::node::Node` does not implement `Future` anymore
- `iroh::node::Node::shutdown()` is now `async` and can be awaited upon
to wait for the node to exit
- `iroh_net::util::AbortingJoinHandle`s inner field is not public
anymore, use the `From<JoinHandle>` implementation to contruct it


## Followups

- [x] Apply this to `bao-tree`:
n0-computer/bao-tree#49
- [x] Apply this to `iroh-io`:
n0-computer/iroh-io#6
- [x] Apply this to `quic-rpc`:
n0-computer/quic-rpc#73

---------

Co-authored-by: Ruediger Klaehn <[email protected]>
  • Loading branch information
dignifiedquire and rklaehn authored Apr 29, 2024
1 parent a26a350 commit b91b684
Show file tree
Hide file tree
Showing 87 changed files with 607 additions and 533 deletions.
285 changes: 164 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] }
flume = "0.11"
futures = "0.3.25"
futures-buffered = "0.2.4"
futures-lite = "2.3"
genawaiter = { version = "0.99.1", features = ["futures03"] }
hashlink = { version = "0.9.0", optional = true }
hex = "0.4.3"
Expand Down Expand Up @@ -54,6 +54,7 @@ tracing-futures = "0.2.5"
http-body = "0.4.5"
iroh-bytes = { path = ".", features = ["downloader"] }
iroh-test = { path = "../iroh-test" }
futures-buffered = "0.2.4"
proptest = "1.0.0"
serde_json = "1.0.107"
serde_test = "1.0.176"
Expand All @@ -62,12 +63,14 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rcgen = "0.12.0"
rustls = { version = "0.21.11", default-features = false, features = ["quic"] }
tempfile = "3.10.0"
futures-util = "0.3.30"

[features]
default = ["fs-store"]
downloader = ["iroh-net", "parking_lot", "tokio-util/time", "hashlink"]
fs-store = ["reflink-copy", "redb", "redb_v1", "tempfile"]
metrics = ["iroh-metrics"]
downloader = ["dep:iroh-net", "dep:parking_lot", "tokio-util/time", "dep:hashlink"]
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
metrics = ["dep:iroh-metrics"]
redb = ["dep:redb"]

[[example]]
name = "provide-bytes"
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/examples/fetch-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::io;

use bao_tree::io::fsm::BaoContentItem;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use genawaiter::sync::Co;
use genawaiter::sync::Gen;
use tokio::io::AsyncWriteExt;
Expand Down
4 changes: 2 additions & 2 deletions iroh-bytes/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ async fn main() -> Result<()> {
#[derive(Clone)]
struct MockEventSender;

use futures::future::FutureExt;
use futures_lite::future::FutureExt;

impl iroh_bytes::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_bytes::provider::Event) -> futures::future::BoxFuture<()> {
fn send(&self, _event: iroh_bytes::provider::Event) -> futures_lite::future::Boxed<()> {
async move {}.boxed()
}
}
10 changes: 4 additions & 6 deletions iroh-bytes/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::{
time::Duration,
};

use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
use hashlink::LinkedHashSet;
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_net::{MagicEndpoint, NodeAddr, NodeId};
Expand Down Expand Up @@ -72,9 +72,7 @@ const SERVICE_CHANNEL_CAPACITY: usize = 128;
pub struct IntentId(pub u64);

/// Trait modeling a dialer. This allows for IO-less testing.
pub trait Dialer:
futures::Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin
{
pub trait Dialer: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin {
/// Type of connections returned by the Dialer.
type Connection: Clone;
/// Dial a node.
Expand All @@ -99,7 +97,7 @@ pub enum FailureAction {
}

/// Future of a get request.
type GetFut = LocalBoxFuture<'static, InternalDownloadResult>;
type GetFut = BoxedLocal<InternalDownloadResult>;

/// Trait modelling performing a single request over a connection. This allows for IO-less testing.
pub trait Getter {
Expand Down Expand Up @@ -307,7 +305,7 @@ impl std::future::Future for DownloadHandle {
use std::task::Poll::*;
// make it easier on holders of the handle to poll the result, removing the receiver error
// from the middle
match self.receiver.poll_unpin(cx) {
match std::pin::Pin::new(&mut self.receiver).poll(cx) {
Ready(Ok(result)) => Ready(result),
Ready(Err(_recv_err)) => Ready(Err(DownloadError::ActorClosed)),
Pending => Pending,
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
get::{db::get_to_db, error::GetError},
store::Store,
};
use futures::FutureExt;
use futures_lite::FutureExt;
#[cfg(feature = "metrics")]
use iroh_metrics::{inc, inc_by};

Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ProgressSender for BroadcastProgressSender {
futs
};

let failed_senders = futures::future::join_all(futs).await;
let failed_senders = futures_buffered::join_all(futs).await;
// remove senders where the receiver is dropped
if failed_senders.iter().any(|s| s.is_some()) {
let mut inner = self.shared.lock();
Expand Down
12 changes: 6 additions & 6 deletions iroh-bytes/src/downloader/test.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#![cfg(test)]
use anyhow::anyhow;
use futures::FutureExt;
use std::{
sync::atomic::AtomicUsize,
time::{Duration, Instant},
};

use futures_util::future::FutureExt;
use iroh_net::key::SecretKey;

use crate::{
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn deduplication() {
handles.push(h);
}
assert!(
futures::future::join_all(handles)
futures_buffered::join_all(handles)
.await
.into_iter()
.all(|r| r.is_ok()),
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn max_concurrent_requests_total() {
}

assert!(
futures::future::join_all(handles)
futures_buffered::join_all(handles)
.await
.into_iter()
.all(|r| r.is_ok()),
Expand Down Expand Up @@ -215,7 +215,7 @@ async fn max_concurrent_requests_per_peer() {
handles.push(h);
}

futures::future::join_all(handles).await;
futures_buffered::join_all(handles).await;
}

/// Tests concurrent progress reporting for multiple intents.
Expand Down Expand Up @@ -301,7 +301,7 @@ async fn concurrent_progress() {

done_tx.send(()).unwrap();

let (res_a, res_b, res_c) = futures::future::join3(handle_a, handle_b, handle_c).await;
let (res_a, res_b, res_c) = tokio::join!(handle_a, handle_b, handle_c);
res_a.unwrap();
res_b.unwrap();
res_c.unwrap();
Expand Down Expand Up @@ -359,7 +359,7 @@ async fn long_queue() {
handles.push(h);
}

let res = futures::future::join_all(handles).await;
let res = futures_buffered::join_all(handles).await;
for res in res {
res.expect("all downloads to succeed");
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/test/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Dialer for TestingDialer {
}
}

impl futures::Stream for TestingDialer {
impl Stream for TestingDialer {
type Item = (NodeId, anyhow::Result<NodeId>);

fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
7 changes: 3 additions & 4 deletions iroh-bytes/src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Implementation of [`super::Getter`] used for testing.
use std::{sync::Arc, time::Duration};

use futures::future::BoxFuture;
use futures_lite::{future::Boxed as BoxFuture, FutureExt};
use parking_lot::RwLock;
use std::{sync::Arc, time::Duration};

use super::*;

Expand All @@ -16,7 +15,7 @@ pub(super) type RequestHandlerFn = Arc<
NodeId,
BroadcastProgressSender,
Duration,
) -> BoxFuture<'static, InternalDownloadResult>
) -> BoxFuture<InternalDownloadResult>
+ Send
+ Sync
+ 'static,
Expand Down
19 changes: 10 additions & 9 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
//! Functions that use the iroh-bytes protocol in conjunction with a bao store.
use bao_tree::ChunkNum;
use futures::{Future, StreamExt};
use std::future::Future;
use std::io;
use std::num::NonZeroU64;

use futures_lite::StreamExt;
use iroh_base::hash::Hash;
use iroh_base::rpc::RpcError;
use serde::{Deserialize, Serialize};

use crate::hashseq::parse_hash_seq;
use crate::protocol::RangeSpec;
use crate::store::BaoBatchWriter;
use crate::store::BaoBlobSize;
use crate::store::FallibleProgressBatchWriter;
use std::io;
use std::num::NonZeroU64;

use crate::hashseq::parse_hash_seq;
use crate::store::BaoBatchWriter;

use crate::{
get::{
Expand All @@ -28,7 +29,7 @@ use crate::{
BlobFormat, HashAndFormat,
};
use anyhow::anyhow;
use bao_tree::ChunkRanges;
use bao_tree::{ChunkNum, ChunkRanges};
use iroh_io::AsyncSliceReader;
use tracing::trace;

Expand Down Expand Up @@ -294,7 +295,7 @@ pub async fn blob_info<D: BaoStore>(db: &D, hash: &Hash) -> io::Result<BlobInfo<

/// Like `get_blob_info`, but for multiple hashes
async fn blob_infos<D: BaoStore>(db: &D, hash_seq: &[Hash]) -> io::Result<Vec<BlobInfo<D>>> {
let items = futures::stream::iter(hash_seq)
let items = futures_lite::stream::iter(hash_seq)
.then(|hash| blob_info(db, hash))
.collect::<Vec<_>>();
items.await.into_iter().collect()
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;
use anyhow::{Context, Result};
use bao_tree::io::fsm::{encode_ranges_validated, Outboard};
use bao_tree::io::EncodeError;
use futures::future::BoxFuture;
use futures_lite::future::Boxed as BoxFuture;
use iroh_base::rpc::RpcError;
use iroh_io::stats::{
SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter,
Expand Down
9 changes: 5 additions & 4 deletions iroh-bytes/src/store/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ impl BaoBatchWriter for BaoFileWriter {

#[cfg(test)]
pub mod test_support {
use std::{io::Cursor, ops::Range};
use std::{future::Future, io::Cursor, ops::Range};

use bao_tree::{
io::{
Expand All @@ -731,7 +731,8 @@ pub mod test_support {
},
BlockSize, ChunkRanges,
};
use futures::{Future, Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use iroh_base::hash::Hash;
use iroh_io::AsyncStreamReader;
use rand::RngCore;
use range_collections::RangeSet2;
Expand Down Expand Up @@ -853,7 +854,7 @@ pub mod test_support {
.chunks(mtu)
.map(Bytes::copy_from_slice)
.collect::<Vec<_>>();
futures::stream::iter(parts).then(move |part| async move {
futures_lite::stream::iter(parts).then(move |part| async move {
tokio::time::sleep(delay).await;
part
})
Expand All @@ -872,7 +873,7 @@ mod tests {
use std::io::Write;

use bao_tree::{blake3, ChunkNum, ChunkRanges};
use futures::StreamExt;
use futures_lite::StreamExt;
use iroh_io::TokioStreamReader;
use tests::test_support::{
decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate,
Expand Down
6 changes: 3 additions & 3 deletions iroh-bytes/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ use bao_tree::io::{
sync::{ReadAt, Size},
};
use bytes::Bytes;
use futures::{channel::oneshot, Stream, StreamExt};
use futures_lite::{Stream, StreamExt};

use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use tokio::io::AsyncWriteExt;
use tokio::{io::AsyncWriteExt, sync::oneshot};
use tracing::trace_span;

mod import_flat_store;
Expand Down Expand Up @@ -1250,7 +1250,7 @@ pub(crate) enum OuterError {
#[error("progress send error: {0}")]
ProgressSend(#[from] ProgressSendError),
#[error("recv error: {0}")]
Recv(#[from] oneshot::Canceled),
Recv(#[from] oneshot::error::RecvError),
#[error("recv error: {0}")]
FlumeRecv(#[from] flume::RecvError),
#[error("join error: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/fs/test_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
path::{Path, PathBuf},
};

use futures::channel::oneshot;
use tokio::sync::oneshot;

use super::{
tables::{ReadableTables, Tables},
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/fs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn to_stream(
.chunks(mtu)
.map(Bytes::copy_from_slice)
.collect::<Vec<_>>();
futures::stream::iter(parts)
futures_lite::stream::iter(parts)
.then(move |part| async move {
tokio::time::sleep(delay).await;
io::Result::Ok(part)
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bao_tree::{
BaoTree,
};
use bytes::{Bytes, BytesMut};
use futures::{Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use std::{
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use bao_tree::{
io::{outboard::PreOrderMemOutboard, sync::Outboard},
};
use bytes::Bytes;
use futures::Stream;
use futures_lite::Stream;
use iroh_io::AsyncSliceReader;
use tokio::io::AsyncWriteExt;

Expand Down
8 changes: 4 additions & 4 deletions iroh-bytes/src/store/traits.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
use std::{collections::BTreeSet, io, path::PathBuf};
use std::{collections::BTreeSet, future::Future, io, path::PathBuf};

use bao_tree::{
io::fsm::{BaoContentItem, Outboard},
BaoTree, ChunkRanges,
};
use bytes::Bytes;
use futures::{Future, Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use genawaiter::rc::{Co, Gen};
use iroh_base::rpc::RpcError;
use iroh_io::AsyncSliceReader;
Expand Down Expand Up @@ -433,7 +433,7 @@ async fn validate_impl(
total: complete.len() as u64,
})
.await?;
let complete_result = futures::stream::iter(complete)
let complete_result = futures_lite::stream::iter(complete)
.map(|hash| {
let store = store.clone();
let tx = tx.clone();
Expand Down Expand Up @@ -482,7 +482,7 @@ async fn validate_impl(
.buffered_unordered(validate_parallelism)
.collect::<Vec<_>>()
.await;
let partial_result = futures::stream::iter(partial)
let partial_result = futures_lite::stream::iter(partial)
.map(|hash| {
let store = store.clone();
let tx = tx.clone();
Expand Down
Loading

0 comments on commit b91b684

Please sign in to comment.