Skip to content

Commit

Permalink
feat(iroh)!: remove node events (#2274)
Browse files Browse the repository at this point in the history
These seem to be mostly unused by consumers, and add complexity.

## Breaking Changes

- remove:
  -  `iroh::node::Event`
  - `iroh::node::Node::subscribe`

## Notes

- [x] Still need to figure out how to migrate the tests

---------

Co-authored-by: Ruediger Klaehn <[email protected]>
  • Loading branch information
dignifiedquire and rklaehn authored May 21, 2024
1 parent 8ad6ff1 commit b412927
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 264 deletions.
82 changes: 2 additions & 80 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,21 @@
//!
//! A node is a server that serves various protocols.
//!
//! You can monitor what is happening in the node using [`Node::subscribe`].
//!
//! To shut down the node, call [`Node::shutdown`].
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt};
use futures_lite::StreamExt;
use iroh_base::key::PublicKey;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
use iroh_net::util::AbortingJoinHandle;
use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint};
use quic_rpc::transport::flume::FlumeConnection;
use quic_rpc::RpcClient;
use tokio::sync::{mpsc, RwLock};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
Expand All @@ -35,38 +32,6 @@ mod rpc_status;
pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;

type EventCallback = Box<dyn Fn(Event) -> BoxFuture<()> + 'static + Sync + Send>;

#[derive(Default, derive_more::Debug, Clone)]
struct Callbacks(#[debug("..")] Arc<RwLock<Vec<EventCallback>>>);

impl Callbacks {
async fn push(&self, cb: EventCallback) {
self.0.write().await.push(cb);
}

#[allow(dead_code)]
async fn send(&self, event: Event) {
let cbs = self.0.read().await;
for cb in &*cbs {
cb(event.clone()).await;
}
}
}

impl iroh_blobs::provider::EventSender for Callbacks {
fn send(&self, event: iroh_blobs::provider::Event) -> BoxFuture<()> {
let this = self.clone();
async move {
let cbs = this.0.read().await;
for cb in &*cbs {
cb(Event::ByteProvide(event.clone())).await;
}
}
.boxed()
}
}

/// A server which implements the iroh node.
///
/// Clients can connect to this server and requests hashes from it.
Expand All @@ -91,9 +56,6 @@ struct NodeInner<D> {
secret_key: SecretKey,
cancel_token: CancellationToken,
controller: FlumeConnection<Response, Request>,
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>>,
callbacks: Callbacks,
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
#[debug("rt")]
Expand All @@ -102,15 +64,6 @@ struct NodeInner<D> {
downloader: Downloader,
}

/// Events emitted by the [`Node`] informing about the current status.
#[derive(Debug, Clone)]
pub enum Event {
/// Events from the iroh-blobs transfer protocol.
ByteProvide(iroh_blobs::provider::Event),
/// Events from database
Db(iroh_blobs::store::Event),
}

/// In memory node.
pub type MemNode = Node<iroh_blobs::store::mem::Store>;

Expand Down Expand Up @@ -177,18 +130,6 @@ impl<D: BaoStore> Node<D> {
self.inner.secret_key.public()
}

/// Subscribe to [`Event`]s emitted from the node, informing about connections and
/// progress.
///
/// Warning: The callback must complete quickly, as otherwise it will block ongoing work.
pub async fn subscribe<F: Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>(
&self,
cb: F,
) -> Result<()> {
self.inner.cb_sender.send(Box::new(cb)).await?;
Ok(())
}

/// Returns a handle that can be used to do RPC calls to the node internally.
pub fn controller(&self) -> crate::client::MemRpcClient {
RpcClient::new(self.inner.controller.clone())
Expand Down Expand Up @@ -319,23 +260,7 @@ mod tests {

let _drop_guard = node.cancel_token().drop_guard();

let (r, mut s) = mpsc::channel(1);
node.subscribe(move |event| {
let r = r.clone();
async move {
if let Event::ByteProvide(iroh_blobs::provider::Event::TaggedBlobAdded {
hash,
..
}) = event
{
r.send(hash).await.ok();
}
}
.boxed()
})
.await?;

let got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
let _got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
let mut stream = node
.controller()
.server_streaming(BlobAddPathRequest {
Expand Down Expand Up @@ -364,9 +289,6 @@ mod tests {
.context("timeout")?
.context("get failed")?;

let event_hash = s.recv().await.expect("missing add tagged blob event");
assert_eq!(got_hash, event_hash);

Ok(())
}

Expand Down
66 changes: 38 additions & 28 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@ use quic_rpc::{
RpcServer, ServiceEndpoint,
};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};

use crate::{
client::RPC_ALPN,
docs_engine::Engine,
node::{Event, NodeInner},
node::NodeInner,
rpc_protocol::{Request, Response, RpcService},
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc, rpc_status::RpcStatus, Callbacks, EventCallback, Node};
use super::{rpc, rpc_status::RpcStatus, Node};

pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN];

Expand Down Expand Up @@ -69,7 +68,7 @@ const MAX_STREAMS: u64 = 10;
///
/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated
/// using [`Node::shutdown`].
#[derive(Debug)]
#[derive(derive_more::Debug)]
pub struct Builder<D, E = DummyServerEndpoint>
where
D: Map,
Expand All @@ -88,6 +87,9 @@ where
docs_store: iroh_docs::store::fs::Store,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: bool,
/// Callback to register when a gc loop is done
#[debug("callback")]
gc_done_callback: Option<Box<dyn Fn() + Send>>,
}

/// Configuration for storage.
Expand Down Expand Up @@ -135,6 +137,7 @@ impl Default for Builder<iroh_blobs::store::mem::Store> {
node_discovery: Default::default(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: None,
}
}
}
Expand All @@ -160,6 +163,7 @@ impl<D: Map> Builder<D> {
node_discovery: Default::default(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: None,
}
}
}
Expand Down Expand Up @@ -222,6 +226,7 @@ where
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: self.gc_done_callback,
})
}

Expand All @@ -242,6 +247,7 @@ where
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
gc_done_callback: self.gc_done_callback,
}
}

Expand All @@ -267,6 +273,7 @@ where
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
gc_done_callback: self.gc_done_callback,
})
}

Expand Down Expand Up @@ -337,6 +344,13 @@ where
self
}

/// Register a callback for when GC is done.
#[cfg(any(test, feature = "test-utils"))]
pub fn register_gc_done_cb(mut self, cb: Box<dyn Fn() + Send>) -> Self {
self.gc_done_callback.replace(cb);
self
}

/// Whether to log the SSL pre-master key.
///
/// If `true` and the `SSLKEYLOGFILE` environment variable is the path to a file this
Expand All @@ -352,7 +366,7 @@ where
/// This will create the underlying network server and spawn a tokio task accepting
/// connections. The returned [`Node`] can be used to control the task as well as
/// get information about it.
pub async fn spawn(self) -> Result<Node<D>> {
pub async fn spawn(mut self) -> Result<Node<D>> {
trace!("spawning node");
let lp = LocalPoolHandle::new(num_cpus::get());

Expand Down Expand Up @@ -406,7 +420,6 @@ where
let endpoint = endpoint.bind(bind_port).await?;
trace!("created quinn endpoint");

let (cb_sender, cb_receiver) = mpsc::channel(8);
let cancel_token = CancellationToken::new();

debug!("rpc listening on: {:?}", self.rpc_endpoint.local_addr());
Expand All @@ -427,12 +440,13 @@ where
);
let sync_db = sync.sync.clone();

let callbacks = Callbacks::default();
let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy {
tracing::info!("Starting GC task with interval {:?}", gc_period);
let db = self.blobs_store.clone();
let callbacks = callbacks.clone();
let task = lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, callbacks));
let gc_done_callback = self.gc_done_callback.take();

let task =
lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, gc_done_callback));
Some(task.into())
} else {
None
Expand All @@ -446,8 +460,6 @@ where
secret_key: self.secret_key,
controller,
cancel_token,
callbacks: callbacks.clone(),
cb_sender,
gc_task,
rt: lp.clone(),
sync,
Expand All @@ -464,8 +476,6 @@ where
async move {
Self::run(
ep,
callbacks,
cb_receiver,
handler,
self.rpc_endpoint,
internal_rpc,
Expand Down Expand Up @@ -508,8 +518,6 @@ where
#[allow(clippy::too_many_arguments)]
async fn run(
server: Endpoint,
callbacks: Callbacks,
mut cb_receiver: mpsc::Receiver<EventCallback>,
handler: rpc::Handler<D>,
rpc: E,
internal_rpc: impl ServiceEndpoint<RpcService>,
Expand Down Expand Up @@ -586,10 +594,6 @@ where
}
});
},
// Handle new callbacks
Some(cb) = cb_receiver.recv() => {
callbacks.push(cb).await;
}
else => break,
}
}
Expand All @@ -609,7 +613,7 @@ where
db: D,
ds: iroh_docs::actor::SyncHandle,
gc_period: Duration,
callbacks: Callbacks,
done_cb: Option<Box<dyn Fn() + Send>>,
) {
let mut live = BTreeSet::new();
tracing::debug!("GC loop starting {:?}", gc_period);
Expand All @@ -623,14 +627,11 @@ where
// do delay before the two phases of GC
tokio::time::sleep(gc_period).await;
tracing::debug!("Starting GC");
callbacks
.send(Event::Db(iroh_blobs::store::Event::GcStarted))
.await;
live.clear();
let doc_hashes = match ds.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::error!("Error getting doc hashes: {}", err);
tracing::warn!("Error getting doc hashes: {}", err);
continue 'outer;
}
};
Expand Down Expand Up @@ -680,9 +681,9 @@ where
}
}
}
callbacks
.send(Event::Db(iroh_blobs::store::Event::GcCompleted))
.await;
if let Some(ref cb) = done_cb {
cb();
}
}
}
}
Expand Down Expand Up @@ -719,7 +720,7 @@ async fn handle_connection<D: BaoStore>(
iroh_blobs::provider::handle_connection(
connection,
node.db.clone(),
node.callbacks.clone(),
MockEventSender,
node.rt.clone(),
)
.await
Expand Down Expand Up @@ -776,3 +777,12 @@ fn make_rpc_endpoint(

Ok((rpc_endpoint, actual_rpc_port))
}

#[derive(Debug, Clone)]
struct MockEventSender;

impl iroh_blobs::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
Box::pin(std::future::ready(()))
}
}
9 changes: 1 addition & 8 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::rpc_protocol::{
NodeWatchResponse, Request, RpcService, SetTagOption,
};

use super::{Event, NodeInner};
use super::NodeInner;

const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1);
/// Chunk size for getting blobs over RPC
Expand Down Expand Up @@ -761,13 +761,6 @@ impl<D: BaoStore> Handler<D> {
tag: tag.clone(),
})
.await?;
self.inner
.callbacks
.send(Event::ByteProvide(
iroh_blobs::provider::Event::TaggedBlobAdded { hash, format, tag },
))
.await;

Ok(())
}

Expand Down
Loading

0 comments on commit b412927

Please sign in to comment.