diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 05227992e8..88665a8c72 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -2,8 +2,6 @@ //! //! A node is a server that serves various protocols. //! -//! You can monitor what is happening in the node using [`Node::subscribe`]. -//! //! To shut down the node, call [`Node::shutdown`]. use std::fmt::Debug; use std::net::SocketAddr; @@ -11,7 +9,7 @@ use std::path::Path; use std::sync::Arc; use anyhow::{anyhow, Result}; -use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt}; +use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::downloader::Downloader; use iroh_blobs::store::Store as BaoStore; @@ -19,7 +17,6 @@ use iroh_net::util::AbortingJoinHandle; use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint}; use quic_rpc::transport::flume::FlumeConnection; use quic_rpc::RpcClient; -use tokio::sync::{mpsc, RwLock}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; @@ -35,38 +32,6 @@ mod rpc_status; pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig}; pub use self::rpc_status::RpcStatus; -type EventCallback = Box BoxFuture<()> + 'static + Sync + Send>; - -#[derive(Default, derive_more::Debug, Clone)] -struct Callbacks(#[debug("..")] Arc>>); - -impl Callbacks { - async fn push(&self, cb: EventCallback) { - self.0.write().await.push(cb); - } - - #[allow(dead_code)] - async fn send(&self, event: Event) { - let cbs = self.0.read().await; - for cb in &*cbs { - cb(event.clone()).await; - } - } -} - -impl iroh_blobs::provider::EventSender for Callbacks { - fn send(&self, event: iroh_blobs::provider::Event) -> BoxFuture<()> { - let this = self.clone(); - async move { - let cbs = this.0.read().await; - for cb in &*cbs { - cb(Event::ByteProvide(event.clone())).await; - } - } - .boxed() - } -} - /// A server which implements the iroh node. /// /// Clients can connect to this server and requests hashes from it. @@ -91,9 +56,6 @@ struct NodeInner { secret_key: SecretKey, cancel_token: CancellationToken, controller: FlumeConnection, - #[debug("callbacks: Sender>")] - cb_sender: mpsc::Sender BoxFuture<()> + Send + Sync + 'static>>, - callbacks: Callbacks, #[allow(dead_code)] gc_task: Option>, #[debug("rt")] @@ -102,15 +64,6 @@ struct NodeInner { downloader: Downloader, } -/// Events emitted by the [`Node`] informing about the current status. -#[derive(Debug, Clone)] -pub enum Event { - /// Events from the iroh-blobs transfer protocol. - ByteProvide(iroh_blobs::provider::Event), - /// Events from database - Db(iroh_blobs::store::Event), -} - /// In memory node. pub type MemNode = Node; @@ -177,18 +130,6 @@ impl Node { self.inner.secret_key.public() } - /// Subscribe to [`Event`]s emitted from the node, informing about connections and - /// progress. - /// - /// Warning: The callback must complete quickly, as otherwise it will block ongoing work. - pub async fn subscribe BoxFuture<()> + Send + Sync + 'static>( - &self, - cb: F, - ) -> Result<()> { - self.inner.cb_sender.send(Box::new(cb)).await?; - Ok(()) - } - /// Returns a handle that can be used to do RPC calls to the node internally. pub fn controller(&self) -> crate::client::MemRpcClient { RpcClient::new(self.inner.controller.clone()) @@ -319,23 +260,7 @@ mod tests { let _drop_guard = node.cancel_token().drop_guard(); - let (r, mut s) = mpsc::channel(1); - node.subscribe(move |event| { - let r = r.clone(); - async move { - if let Event::ByteProvide(iroh_blobs::provider::Event::TaggedBlobAdded { - hash, - .. - }) = event - { - r.send(hash).await.ok(); - } - } - .boxed() - }) - .await?; - - let got_hash = tokio::time::timeout(Duration::from_secs(1), async move { + let _got_hash = tokio::time::timeout(Duration::from_secs(1), async move { let mut stream = node .controller() .server_streaming(BlobAddPathRequest { @@ -364,9 +289,6 @@ mod tests { .context("timeout")? .context("get failed")?; - let event_hash = s.recv().await.expect("missing add tagged blob event"); - assert_eq!(got_hash, event_hash); - Ok(()) } diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index addec034d4..61a53f2828 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -27,19 +27,18 @@ use quic_rpc::{ RpcServer, ServiceEndpoint, }; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; use tokio_util::{sync::CancellationToken, task::LocalPoolHandle}; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ client::RPC_ALPN, docs_engine::Engine, - node::{Event, NodeInner}, + node::NodeInner, rpc_protocol::{Request, Response, RpcService}, util::{fs::load_secret_key, path::IrohPaths}, }; -use super::{rpc, rpc_status::RpcStatus, Callbacks, EventCallback, Node}; +use super::{rpc, rpc_status::RpcStatus, Node}; pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN]; @@ -69,7 +68,7 @@ const MAX_STREAMS: u64 = 10; /// /// The returned [`Node`] is awaitable to know when it finishes. It can be terminated /// using [`Node::shutdown`]. -#[derive(Debug)] +#[derive(derive_more::Debug)] pub struct Builder where D: Map, @@ -88,6 +87,9 @@ where docs_store: iroh_docs::store::fs::Store, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, + /// Callback to register when a gc loop is done + #[debug("callback")] + gc_done_callback: Option>, } /// Configuration for storage. @@ -135,6 +137,7 @@ impl Default for Builder { node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, + gc_done_callback: None, } } } @@ -160,6 +163,7 @@ impl Builder { node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, + gc_done_callback: None, } } } @@ -222,6 +226,7 @@ where node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, + gc_done_callback: self.gc_done_callback, }) } @@ -242,6 +247,7 @@ where node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, + gc_done_callback: self.gc_done_callback, } } @@ -267,6 +273,7 @@ where node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, + gc_done_callback: self.gc_done_callback, }) } @@ -337,6 +344,13 @@ where self } + /// Register a callback for when GC is done. + #[cfg(any(test, feature = "test-utils"))] + pub fn register_gc_done_cb(mut self, cb: Box) -> Self { + self.gc_done_callback.replace(cb); + self + } + /// Whether to log the SSL pre-master key. /// /// If `true` and the `SSLKEYLOGFILE` environment variable is the path to a file this @@ -352,7 +366,7 @@ where /// This will create the underlying network server and spawn a tokio task accepting /// connections. The returned [`Node`] can be used to control the task as well as /// get information about it. - pub async fn spawn(self) -> Result> { + pub async fn spawn(mut self) -> Result> { trace!("spawning node"); let lp = LocalPoolHandle::new(num_cpus::get()); @@ -406,7 +420,6 @@ where let endpoint = endpoint.bind(bind_port).await?; trace!("created quinn endpoint"); - let (cb_sender, cb_receiver) = mpsc::channel(8); let cancel_token = CancellationToken::new(); debug!("rpc listening on: {:?}", self.rpc_endpoint.local_addr()); @@ -427,12 +440,13 @@ where ); let sync_db = sync.sync.clone(); - let callbacks = Callbacks::default(); let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy { tracing::info!("Starting GC task with interval {:?}", gc_period); let db = self.blobs_store.clone(); - let callbacks = callbacks.clone(); - let task = lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, callbacks)); + let gc_done_callback = self.gc_done_callback.take(); + + let task = + lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, gc_done_callback)); Some(task.into()) } else { None @@ -446,8 +460,6 @@ where secret_key: self.secret_key, controller, cancel_token, - callbacks: callbacks.clone(), - cb_sender, gc_task, rt: lp.clone(), sync, @@ -464,8 +476,6 @@ where async move { Self::run( ep, - callbacks, - cb_receiver, handler, self.rpc_endpoint, internal_rpc, @@ -508,8 +518,6 @@ where #[allow(clippy::too_many_arguments)] async fn run( server: Endpoint, - callbacks: Callbacks, - mut cb_receiver: mpsc::Receiver, handler: rpc::Handler, rpc: E, internal_rpc: impl ServiceEndpoint, @@ -586,10 +594,6 @@ where } }); }, - // Handle new callbacks - Some(cb) = cb_receiver.recv() => { - callbacks.push(cb).await; - } else => break, } } @@ -609,7 +613,7 @@ where db: D, ds: iroh_docs::actor::SyncHandle, gc_period: Duration, - callbacks: Callbacks, + done_cb: Option>, ) { let mut live = BTreeSet::new(); tracing::debug!("GC loop starting {:?}", gc_period); @@ -623,14 +627,11 @@ where // do delay before the two phases of GC tokio::time::sleep(gc_period).await; tracing::debug!("Starting GC"); - callbacks - .send(Event::Db(iroh_blobs::store::Event::GcStarted)) - .await; live.clear(); let doc_hashes = match ds.content_hashes().await { Ok(hashes) => hashes, Err(err) => { - tracing::error!("Error getting doc hashes: {}", err); + tracing::warn!("Error getting doc hashes: {}", err); continue 'outer; } }; @@ -680,9 +681,9 @@ where } } } - callbacks - .send(Event::Db(iroh_blobs::store::Event::GcCompleted)) - .await; + if let Some(ref cb) = done_cb { + cb(); + } } } } @@ -719,7 +720,7 @@ async fn handle_connection( iroh_blobs::provider::handle_connection( connection, node.db.clone(), - node.callbacks.clone(), + MockEventSender, node.rt.clone(), ) .await @@ -776,3 +777,12 @@ fn make_rpc_endpoint( Ok((rpc_endpoint, actual_rpc_port)) } + +#[derive(Debug, Clone)] +struct MockEventSender; + +impl iroh_blobs::provider::EventSender for MockEventSender { + fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> { + Box::pin(std::future::ready(())) + } +} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index f7640f5ff2..0c50f7ed33 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -52,7 +52,7 @@ use crate::rpc_protocol::{ NodeWatchResponse, Request, RpcService, SetTagOption, }; -use super::{Event, NodeInner}; +use super::NodeInner; const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); /// Chunk size for getting blobs over RPC @@ -761,13 +761,6 @@ impl Handler { tag: tag.clone(), }) .await?; - self.inner - .callbacks - .send(Event::ByteProvide( - iroh_blobs::provider::Event::TaggedBlobAdded { hash, format, tag }, - )) - .await; - Ok(()) } diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index e0899fb2fb..4c3c3fc26f 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -6,7 +6,6 @@ use std::{ use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; use bytes::Bytes; -use futures_lite::FutureExt; use iroh::node::{self, Node}; use rand::RngCore; @@ -38,55 +37,40 @@ pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { } /// Wrap a bao store in a node that has gc enabled. -async fn wrap_in_node(bao_store: S, gc_period: Duration) -> Node +async fn wrap_in_node(bao_store: S, gc_period: Duration) -> (Node, flume::Receiver<()>) where S: iroh_blobs::store::Store, { let doc_store = iroh_docs::store::Store::memory(); - node::Builder::with_db_and_store(bao_store, doc_store, iroh::node::StorageConfig::Mem) - .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) - .spawn() - .await - .unwrap() -} - -async fn attach_db_events( - node: &Node, -) -> flume::Receiver { - let (db_send, db_recv) = flume::unbounded(); - node.subscribe(move |ev| { - let db_send = db_send.clone(); - async move { - if let iroh::node::Event::Db(ev) = ev { - db_send.into_send_async(ev).await.ok(); - } - } - .boxed() - }) - .await - .unwrap(); - db_recv + let (gc_send, gc_recv) = flume::unbounded(); + let node = + node::Builder::with_db_and_store(bao_store, doc_store, iroh::node::StorageConfig::Mem) + .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) + .register_gc_done_cb(Box::new(move || { + gc_send.send(()).ok(); + })) + .spawn() + .await + .unwrap(); + (node, gc_recv) } async fn gc_test_node() -> ( Node, iroh_blobs::store::mem::Store, - flume::Receiver, + flume::Receiver<()>, ) { let bao_store = iroh_blobs::store::mem::Store::new(); - let node = wrap_in_node(bao_store.clone(), Duration::from_millis(500)).await; - let db_recv = attach_db_events(&node).await; - (node, bao_store, db_recv) + let (node, gc_recv) = wrap_in_node(bao_store.clone(), Duration::from_millis(500)).await; + (node, bao_store, gc_recv) } -async fn step(evs: &flume::Receiver) { +async fn step(evs: &flume::Receiver<()>) { + // drain the event queue, we want a new GC while evs.try_recv().is_ok() {} + // wait for several GC cycles for _ in 0..3 { - while let Ok(ev) = evs.recv_async().await { - if let iroh_blobs::store::Event::GcCompleted = ev { - break; - } - } + evs.recv_async().await.unwrap(); } } @@ -246,7 +230,7 @@ mod file { let _ = tracing_subscriber::fmt::try_init(); let dir = testdir!(); let bao_store = iroh_blobs::store::fs::Store::load(dir.join("store")).await?; - let node = wrap_in_node(bao_store.clone(), Duration::from_secs(10)).await; + let (node, _) = wrap_in_node(bao_store.clone(), Duration::from_secs(10)).await; let client = node.client(); let doc = client.docs.create().await?; let author = client.authors.create().await?; @@ -289,8 +273,7 @@ mod file { let outboard_path = outboard_path(dir.clone()); let bao_store = iroh_blobs::store::fs::Store::load(dir.clone()).await?; - let node = wrap_in_node(bao_store.clone(), Duration::from_millis(100)).await; - let evs = attach_db_events(&node).await; + let (node, evs) = wrap_in_node(bao_store.clone(), Duration::from_millis(100)).await; let data1 = create_test_data(10000000); let tt1 = bao_store .import_bytes(data1.clone(), BlobFormat::Raw) @@ -452,8 +435,7 @@ mod file { let outboard_path = outboard_path(dir.clone()); let bao_store = iroh_blobs::store::fs::Store::load(dir.clone()).await?; - let node = wrap_in_node(bao_store.clone(), Duration::from_millis(10)).await; - let evs = attach_db_events(&node).await; + let (node, evs) = wrap_in_node(bao_store.clone(), Duration::from_millis(10)).await; let data1: Bytes = create_test_data(10000000); let (_entry, tt1) = simulate_download_partial(&bao_store, data1.clone()).await?; @@ -484,8 +466,7 @@ mod file { let dir = testdir!(); let bao_store = iroh_blobs::store::fs::Store::load(dir.clone()).await?; - let node = wrap_in_node(bao_store.clone(), Duration::from_secs(1)).await; - let evs = attach_db_events(&node).await; + let (node, evs) = wrap_in_node(bao_store.clone(), Duration::from_secs(1)).await; let mut deleted = Vec::new(); let mut live = Vec::new(); diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index dcec072ca9..a4f005fe58 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -5,15 +5,14 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use bytes::Bytes; use futures_lite::FutureExt; -use iroh::node::{Builder, Event}; +use iroh::node::Builder; use iroh_base::node_addr::AddrInfoOptions; use iroh_net::{defaults::default_relay_map, key::SecretKey, NodeAddr, NodeId}; use quic_rpc::transport::misc::DummyServerEndpoint; use rand::RngCore; -use tokio::sync::mpsc; use bao_tree::{blake3, ChunkNum, ChunkRanges}; use iroh_blobs::{ @@ -24,7 +23,6 @@ use iroh_blobs::{ Stats, }, protocol::{GetRequest, RangeSpecSeq}, - provider, store::{MapMut, Store}, BlobFormat, Hash, }; @@ -225,17 +223,6 @@ where let node = test_node(mdb.clone()).spawn().await?; - let (events_sender, mut events_recv) = mpsc::unbounded_channel(); - - node.subscribe(move |event| { - let events_sender = events_sender.clone(); - async move { - events_sender.send(event).ok(); - } - .boxed() - }) - .await?; - let addrs = node.local_endpoint_addresses().await?; let (secret_key, peer) = get_options(node.node_id(), addrs); let request = GetRequest::all(collection_hash); @@ -251,66 +238,11 @@ where assert_eq!(expected, got); } - // We have to wait for the completed event before shutting down the node. - let events = tokio::time::timeout(Duration::from_secs(30), async move { - let mut events = Vec::new(); - while let Some(event) = events_recv.recv().await { - match event { - Event::ByteProvide(provider::Event::TransferCompleted { .. }) - | Event::ByteProvide(provider::Event::TransferAborted { .. }) => { - events.push(event); - break; - } - _ => events.push(event), - } - } - events - }) - .await - .expect("duration expired"); - node.shutdown().await?; - assert_events(events, num_blobs + 1); - Ok(()) } -fn assert_events(events: Vec, num_blobs: usize) { - let num_basic_events = 4; - let num_total_events = num_basic_events + num_blobs; - assert_eq!( - events.len(), - num_total_events, - "missing events, only got {:#?}", - events - ); - assert!(matches!( - events[0], - Event::ByteProvide(provider::Event::ClientConnected { .. }) - )); - assert!(matches!( - events[1], - Event::ByteProvide(provider::Event::GetRequestReceived { .. }) - )); - assert!(matches!( - events[2], - Event::ByteProvide(provider::Event::TransferHashSeqStarted { .. }) - )); - for (i, event) in events[3..num_total_events - 1].iter().enumerate() { - match event { - Event::ByteProvide(provider::Event::TransferBlobCompleted { index, .. }) => { - assert_eq!(*index, i as u64); - } - _ => panic!("unexpected event {:?}", event), - } - } - assert!(matches!( - events.last().unwrap(), - Event::ByteProvide(provider::Event::TransferCompleted { .. }) - )); -} - #[tokio::test] async fn test_server_close() { // Prepare a Provider transferring a file. @@ -323,47 +255,11 @@ async fn test_server_close() { let node_addr = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); - let (events_sender, mut events_recv) = mpsc::unbounded_channel(); - node.subscribe(move |event| { - let events_sender = events_sender.clone(); - async move { - events_sender.send(event).ok(); - } - .boxed() - }) - .await - .unwrap(); let (secret_key, peer) = get_options(peer_id, node_addr); let request = GetRequest::all(hash); let (_collection, _children, _stats) = run_collection_get_request(secret_key, peer, request) .await .unwrap(); - - // Unwrap the JoinHandle, then the result of the Provider - tokio::time::timeout(Duration::from_secs(10), async move { - loop { - tokio::select! { - biased; - maybe_event = events_recv.recv() => { - match maybe_event { - Some(event) => match event { - Event::ByteProvide(provider::Event::TransferCompleted { .. }) => { - return node.shutdown().await; - }, - Event::ByteProvide(provider::Event::TransferAborted { .. }) => { - break Err(anyhow!("transfer aborted")); - } - _ => (), - } - None => break Err(anyhow!("events ended")), - } - } - } - } - }) - .await - .expect("supervisor timeout") - .expect("supervisor failed"); } /// create an in memory test database containing the given entries and an iroh collection of all entries