Skip to content

Commit

Permalink
try to fix things
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 12, 2024
1 parent f750306 commit 8282b24
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.18.0", path = "../iroh-metrics", optional = true }
iroh-net = { version = "0.18.0", path = "../iroh-net" }
num_cpus = { version = "1.15.0" }
once_cell = "1.17.0"
portable-atomic = "1"
iroh-docs = { version = "0.18.0", path = "../iroh-docs" }
iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" }
Expand Down
8 changes: 4 additions & 4 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use iroh_base::key::PublicKey;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
use iroh_docs::engine::Engine;

use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint};
use once_cell::sync::OnceCell;
use quic_rpc::transport::flume::FlumeConnection;
use quic_rpc::RpcClient;
use tokio::task::{JoinHandle, JoinSet};
Expand Down Expand Up @@ -47,7 +47,7 @@ pub use protocol::Protocol;
#[derive(Debug, Clone)]
pub struct Node<D> {
inner: Arc<NodeInner<D>>,
task: Arc<JoinHandle<()>>,
task: Arc<OnceCell<JoinHandle<()>>>,
client: crate::client::MemIroh,
protocols: ProtocolMap,
tasks: Arc<Mutex<Option<JoinSet<()>>>>,
Expand Down Expand Up @@ -174,8 +174,8 @@ impl<D: BaoStore> Node<D> {
pub async fn shutdown(self) -> Result<()> {
self.inner.cancel_token.cancel();

if let Ok(task) = Arc::try_unwrap(self.task) {
task.await?;
if let Ok(mut task) = Arc::try_unwrap(self.task) {
task.take().expect("cannot be empty").await?;
}
if let Some(mut tasks) = self.tasks.lock().unwrap().take() {
tasks.abort_all();
Expand Down
35 changes: 14 additions & 21 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use quic_rpc::{
RpcServer, ServiceEndpoint,
};
use serde::{Deserialize, Serialize};
use tokio::{sync::oneshot, task::JoinSet};
use tokio::task::JoinSet;
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};

Expand Down Expand Up @@ -553,24 +553,9 @@ where
rt: lp.clone(),
downloader,
});
let (ready_tx, ready_rx) = oneshot::channel();
let task = {
let protocols = protocols.clone();
let me = endpoint.node_id().fmt_short();
let inner = inner.clone();
tokio::task::spawn(
async move {
// Wait until the protocol builders have run.
ready_rx.await.expect("cannot fail");
Self::run(inner, protocols, self.rpc_endpoint, internal_rpc).await
}
.instrument(error_span!("node", %me)),
)
};

let node = Node {
inner,
task: Arc::new(task),
inner: inner.clone(),
task: Default::default(),
client,
protocols: protocols.clone(),
tasks: Default::default(),
Expand All @@ -581,6 +566,17 @@ where
protocols.insert(alpn, protocol);
}

let task = {
let protocols = protocols.clone();
let me = endpoint.node_id().fmt_short();
let inner = inner.clone();
tokio::task::spawn(
async move { Self::run(inner, protocols, self.rpc_endpoint, internal_rpc).await }
.instrument(error_span!("node", %me)),
)
};
node.task.set(task).expect("was empty");

let sync = protocols
.get::<DocsEngine>(DOCS_ALPN)
.context("docs engine not registered")?;
Expand All @@ -593,9 +589,6 @@ where
tasks.spawn_local(Self::gc_loop(db, sync, gc_period, gc_done_callback));
}

// Notify the run task that the protocols are now built.
ready_tx.send(()).expect("cannot fail");

// spawn a task that updates the gossip endpoints.
let mut stream = endpoint.local_endpoints();
let gossip = protocols.get::<Gossip>(GOSSIP_ALPN);
Expand Down

0 comments on commit 8282b24

Please sign in to comment.