Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests(iroh): Test sync with restarting node #2146

Merged
merged 13 commits into from
May 3, 2024
9 changes: 7 additions & 2 deletions iroh-cli/src/commands/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,13 @@ impl DocCommands {
Origin::Connect(_) => "we initiated",
};
match event.result {
Ok(()) => {
println!("synced peer {} ({origin})", fmt_short(event.peer))
Ok(details) => {
println!(
"synced peer {} ({origin}, received {}, sent {}",
fmt_short(event.peer),
details.entries_received,
details.entries_sent
)
}
Err(err) => println!(
"failed to sync with peer {} ({origin}): {err}",
Expand Down
3 changes: 2 additions & 1 deletion iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ where
validate_entry(system_time_now(), store, namespace, &entry, &origin)?;

let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");

let removed_count = match outcome {
InsertOutcome::Inserted { removed } => removed,
Expand Down Expand Up @@ -451,7 +452,7 @@ where

let download_policy = self
.store
.get_download_policy(&self.capability().id())
.get_download_policy(&self.id())
.unwrap_or_default();
let should_download = download_policy.matches(entry.entry());
Event::RemoteInsert {
Expand Down
1 change: 1 addition & 0 deletions iroh/src/sync_engine/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl GossipActor {
return Ok(false);
}
ToGossipActor::Join { namespace, peers } => {
debug!(?namespace, peers = peers.len(), "join gossip");
let gossip = self.gossip.clone();
// join gossip for the topic to receive and send message
let fut = async move {
Expand Down
28 changes: 24 additions & 4 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
}

async fn start_sync(&mut self, namespace: NamespaceId, mut peers: Vec<NodeAddr>) -> Result<()> {
debug!(?namespace, peers = peers.len(), "start sync");
// update state to allow sync
if !self.state.is_syncing(&namespace) {
let opts = OpenOpts::default()
Expand Down Expand Up @@ -561,7 +562,7 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
};

let result_for_event = match &result {
Ok(_) => Ok(()),
Ok(details) => Ok(details.into()),
Err(err) => Err(err.to_string()),
};

Expand Down Expand Up @@ -756,8 +757,10 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
};
debug!("incoming connection");
let sync = self.sync.clone();
self.running_sync_accept
.spawn(async move { handle_connection(sync, conn, accept_request_cb).await });
self.running_sync_accept.spawn(
async move { handle_connection(sync, conn, accept_request_cb).await }
.instrument(Span::current()),
);
}

pub fn accept_sync_request(
Expand All @@ -782,7 +785,24 @@ pub struct SyncEvent {
/// Timestamp when the sync finished
pub started: SystemTime,
/// Result of the sync operation
pub result: std::result::Result<(), String>,
pub result: std::result::Result<SyncDetails, String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct SyncDetails {
/// Number of entries received
pub entries_received: usize,
/// Number of entries sent
pub entries_sent: usize,
}

impl From<&SyncFinished> for SyncDetails {
fn from(value: &SyncFinished) -> Self {
Self {
entries_received: value.outcome.num_recv,
entries_sent: value.outcome.num_sent,
}
}
}

#[derive(Debug, Default)]
Expand Down
16 changes: 7 additions & 9 deletions iroh/src/sync_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,12 @@ impl NamespaceStates {
node: NodeId,
reason: SyncReason,
) -> bool {
let Some(state) = self.entry(namespace, node) else {
debug!("abort connect: namespace is not in sync set");
return false;
};
if state.start_connect(reason) {
true
} else {
debug!("abort connect: already syncing");
false
match self.entry(namespace, node) {
None => {
debug!("abort connect: namespace is not in sync set");
false
}
Some(state) => state.start_connect(reason),
}
}

Expand Down Expand Up @@ -170,6 +167,7 @@ impl PeerState {
}

fn start_connect(&mut self, reason: SyncReason) -> bool {
debug!(?reason, "start connect");
match self.state {
// never run two syncs at the same time
SyncState::Running { .. } => {
Expand Down
133 changes: 133 additions & 0 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ pub fn test_rng(seed: &[u8]) -> rand_chacha::ChaCha12Rng {
rand_chacha::ChaCha12Rng::from_seed(*Hash::new(seed).as_bytes())
}

macro_rules! match_event {
($pattern:pat $(if $guard:expr)? $(,)?) => {
Box::new(move |e| matches!(e, $pattern $(if $guard)?))
};
}

/// This tests the simplest scenario: A node connects to another node, and performs sync.
#[tokio::test]
async fn sync_simple() -> Result<()> {
Expand Down Expand Up @@ -552,6 +558,133 @@ async fn test_sync_via_relay() -> Result<()> {
Ok(())
}

#[tokio::test]
#[cfg(feature = "test-utils")]
async fn sync_restart_node() -> Result<()> {
let mut rng = test_rng(b"sync_restart_node");
setup_logging();
let (relay_map, _relay_url, _guard) = iroh_net::test_utils::run_relay_server().await?;

let discovery_server = iroh_net::test_utils::DnsPkarrServer::run().await?;

let node1_dir = tempfile::TempDir::with_prefix("test-sync_restart_node-node1")?;
let secret_key_1 = SecretKey::generate_with_rng(&mut rng);

let node1 = Node::persistent(&node1_dir)
.await?
.secret_key(secret_key_1.clone())
.insecure_skip_relay_cert_verify(true)
.relay_mode(RelayMode::Custom(relay_map.clone()))
.dns_resolver(discovery_server.dns_resolver())
.node_discovery(discovery_server.discovery(secret_key_1.clone()).into())
.spawn()
.await?;
let id1 = node1.node_id();

// create doc & ticket on node1
let doc1 = node1.docs.create().await?;
let mut events1 = doc1.subscribe().await?;
let ticket = doc1
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;

// create node2
let secret_key_2 = SecretKey::generate_with_rng(&mut rng);
let node2 = Node::memory()
.secret_key(secret_key_2.clone())
.relay_mode(RelayMode::Custom(relay_map.clone()))
.insecure_skip_relay_cert_verify(true)
.dns_resolver(discovery_server.dns_resolver())
.node_discovery(discovery_server.discovery(secret_key_2.clone()).into())
.spawn()
.await?;
let id2 = node2.node_id();
let author2 = node2.authors.create().await?;
let doc2 = node2.docs.import(ticket.clone()).await?;

info!("node2 set a");
let hash_a = doc2.set_bytes(author2, "n2/a", "a").await?;
assert_latest(&doc2, b"n2/a", b"a").await;

assert_next_unordered_with_optionals(
&mut events1,
Duration::from_secs(10),
vec![
match_event!(LiveEvent::NeighborUp(n) if *n == id2),
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash_a),
],
vec![
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
],
)
.await;
assert_latest(&doc1, b"n2/a", b"a").await;

info!(me = id1.fmt_short(), "node1 start shutdown");
node1.shutdown().await?;
info!(me = id1.fmt_short(), "node1 down");

info!(me = id1.fmt_short(), "sleep 1s");
tokio::time::sleep(Duration::from_secs(1)).await;

info!(me = id2.fmt_short(), "node2 set b");
let hash_b = doc2.set_bytes(author2, "n2/b", "b").await?;

info!(me = id1.fmt_short(), "node1 respawn");
let node1 = Node::persistent(&node1_dir)
.await?
.secret_key(secret_key_1.clone())
.insecure_skip_relay_cert_verify(true)
.relay_mode(RelayMode::Custom(relay_map.clone()))
.dns_resolver(discovery_server.dns_resolver())
.node_discovery(discovery_server.discovery(secret_key_1.clone()).into())
.spawn()
.await?;
assert_eq!(id1, node1.node_id());

let doc1 = node1.docs.open(doc1.id()).await?.expect("doc to exist");
let mut events1 = doc1.subscribe().await?;
assert_latest(&doc1, b"n2/a", b"a").await;

// check that initial resync is working
doc1.start_sync(vec![]).await?;
assert_next_unordered_with_optionals(
&mut events1,
Duration::from_secs(10),
vec![
match_event!(LiveEvent::NeighborUp(n) if *n== id2),
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash_b),
],
vec![
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
]
).await;
assert_latest(&doc1, b"n2/b", b"b").await;

// check that live conn is working
info!(me = id2.fmt_short(), "node2 set c");
let hash_c = doc2.set_bytes(author2, "n2/c", "c").await?;
assert_next_unordered_with_optionals(
&mut events1,
Duration::from_secs(10),
vec![
match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash_c),
],
vec![
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
]
).await;

assert_latest(&doc1, b"n2/c", b"c").await;

Ok(())
}

/// Joins two nodes that write to the same document but have differing download policies and tests
/// that they both synced the key info but not the content.
#[tokio::test]
Expand Down
Loading