Skip to content

Commit

Permalink
fix: guarantee that PendingContentReady is only ever emitted after Sy…
Browse files Browse the repository at this point in the history
…ncFinished
  • Loading branch information
Frando committed May 16, 2024
1 parent 07a0e6d commit 4031c15
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 34 deletions.
6 changes: 4 additions & 2 deletions iroh/src/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,10 @@ pub enum LiveEvent {
SyncFinished(SyncEvent),
/// All pending content is now ready.
///
/// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
/// least once. It signals that all currently pending downloads have been completed.
/// This event signals that all queued content downloads from the last sync run have either
/// completed or failed.
///
/// It will only be emitted after a [`Self::SyncFinished`] event, never before.
///
/// Receiving this event does not guarantee that all content in the document is available. If
/// blobs failed to download, this event will still be emitted after all operations completed.
Expand Down
6 changes: 4 additions & 2 deletions iroh/src/docs_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,10 @@ pub enum LiveEvent {
},
/// All pending content is now ready.
///
/// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
/// least once. It signals that all currently pending downloads have been completed.
/// This event signals that all queued content downloads from the last sync run have either
/// completed or failed.
///
/// It will only be emitted after a [`Self::SyncFinished`] event, never before.
///
/// Receiving this event does not guarantee that all content in the document is available. If
/// blobs failed to download, this event will still be emitted after all operations completed.
Expand Down
16 changes: 14 additions & 2 deletions iroh/src/docs_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,18 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
.send(&namespace, Event::SyncFinished(ev))
.await;

if !self.queued_hashes.contains_namespace(&namespace) {
// Check if there are queued pending content hashes for this namespace.
// If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
// pending hashes have completed downloading.
// If no hashes are pending, emit the PendingContentReady event right away. The next
// PendingContentReady event may then only be emitted after the next sync completes.
if self.queued_hashes.contains_namespace(&namespace) {
self.state.set_may_emit_ready(&namespace, true);
} else {
self.subscribers
.send(&namespace, Event::PendingContentReady)
.await;
self.state.set_may_emit_ready(&namespace, false);
}

if resync {
Expand Down Expand Up @@ -620,6 +628,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
res: Result<Stats, DownloadError>,
) {
let completed_namespaces = self.queued_hashes.remove_hash(&hash);
debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
if res.is_ok() {
self.subscribers
.send(&namespace, Event::ContentReady { hash })
Expand All @@ -631,7 +640,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
self.missing_hashes.insert(hash);
}
for namespace in completed_namespaces.iter() {
if self.state.did_complete(namespace) {
if let Some(true) = self.state.may_emit_ready(namespace) {
self.subscribers
.send(namespace, Event::PendingContentReady)
.await;
Expand Down Expand Up @@ -678,6 +687,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
async fn on_replica_event(&mut self, event: iroh_docs::Event) -> Result<()> {
match event {
iroh_docs::Event::LocalInsert { namespace, entry } => {
debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
let topic = TopicId::from_bytes(*namespace.as_bytes());
// A new entry was inserted locally. Broadcast a gossip message.
if self.state.is_syncing(&namespace) {
Expand All @@ -693,6 +703,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
should_download,
remote_content_status,
} => {
debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
// A new entry was inserted from initial sync or gossip. Queue downloading the
// content.
if should_download {
Expand Down Expand Up @@ -822,6 +833,7 @@ impl SubscribersMap {
}

async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
debug!(namespace=%namespace.fmt_short(), %event, "emit event");
let Some(subscribers) = self.0.get_mut(namespace) else {
return false;
};
Expand Down
39 changes: 27 additions & 12 deletions iroh/src/docs_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
#[derive(Default)]
struct NamespaceState {
nodes: BTreeMap<NodeId, PeerState>,
may_emit_ready: bool,
}

impl NamespaceStates {
Expand All @@ -65,18 +66,6 @@ impl NamespaceStates {
self.0.entry(namespace).or_default();
}

/// Returns `true` if at least one sync was completed for this namespace.
pub fn did_complete(&self, namespace: &NamespaceId) -> bool {
self.0
.get(namespace)
.map(|s| {
s.nodes
.iter()
.any(|(_peer, state)| state.last_sync.is_some())
})
.unwrap_or(false)
}

/// Start a sync request.
///
/// Returns true if the request should be performed, and false if it should be aborted.
Expand Down Expand Up @@ -128,6 +117,32 @@ impl NamespaceStates {
state.finish(origin, result)
}

/// Set whether a [`super::Event::PendingContentReady`] may be emitted once the pending queue
/// becomes empty.
///
/// This should be set to `true` if there are pending content hashes after a sync finished, and
/// to `false` whenever a `PendingContentReady` was emitted.
pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
let state = self.0.get_mut(namespace)?;
state.may_emit_ready = value;
Some(())
}
/// Returns whether a [`super::Event::PendingContentReady`] event may be emitted once the
/// pending queue becomes empty.
///
/// If this returns `false`, an event should not be emitted even if the queue becomes empty,
/// because a currently running sync did not yet terminate. Once it terminates, the event will
/// be emitted from the handler for finished syncs.
pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
let state = self.0.get_mut(namespace)?;
if state.may_emit_ready {
state.may_emit_ready = false;
Some(true)
} else {
Some(false)
}
}

/// Remove a namespace from the set of syncing namespaces.
pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
self.0.remove(namespace).is_some()
Expand Down
63 changes: 47 additions & 16 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn sync_simple() -> Result<()> {
assert_latest(&doc1, b"k1", b"v1").await;

info!("node0: assert 2 events");
assert_next_unordered(
assert_next(
&mut events0,
TIMEOUT,
vec![
Expand Down Expand Up @@ -297,12 +297,13 @@ async fn sync_full_basic() -> Result<()> {
.await;

info!("peer0: wait for 2 events (join & accept sync finished from peer1)");
assert_next_unordered(
assert_next(
&mut events0,
TIMEOUT,
vec![
match_event!(LiveEvent::NeighborUp(peer) if *peer == peer1),
Box::new(move |e| match_sync_finished(e, peer1)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
Expand All @@ -315,27 +316,23 @@ async fn sync_full_basic() -> Result<()> {
.await?;
assert_latest(&doc1, key1, value1).await;
info!("peer1: wait for 1 event (local insert, and pendingcontentready)");
assert_next_unordered(
assert_next(
&mut events1,
TIMEOUT,
vec![
match_event!(LiveEvent::InsertLocal { entry} if entry.content_hash() == hash1),
match_event!(LiveEvent::PendingContentReady),
],
vec![match_event!(LiveEvent::InsertLocal { entry} if entry.content_hash() == hash1)],
)
.await;

// peer0: assert events for entry received via gossip
info!("peer0: wait for 2 events (gossip'ed entry from peer1)");
assert_next_unordered(
assert_next(
&mut events0,
TIMEOUT,
vec![
Box::new(
move |e| matches!(e, LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == peer1),
),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash1)),
match_event!(LiveEvent::PendingContentReady),
],
).await;
assert_latest(&doc0, key1, value1).await;
Expand All @@ -352,7 +349,7 @@ async fn sync_full_basic() -> Result<()> {
let peer2 = nodes[2].node_id();
let mut events2 = doc2.subscribe().await?;

info!("peer2: wait for 8 events (from sync with peers)");
info!("peer2: wait for 9 events (from sync with peers)");
assert_next_unordered_with_optionals(
&mut events2,
TIMEOUT,
Expand All @@ -374,7 +371,7 @@ async fn sync_full_basic() -> Result<()> {
// 2 ContentReady events
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)),
Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash1)),
match_event!(LiveEvent::PendingContentReady),
// at least 1 PendingContentReady
match_event!(LiveEvent::PendingContentReady),
],
// optional events
Expand All @@ -393,23 +390,25 @@ async fn sync_full_basic() -> Result<()> {
assert_latest(&doc2, b"k2", b"v2").await;

info!("peer0: wait for 2 events (join & accept sync finished from peer2)");
assert_next_unordered(
assert_next(
&mut events0,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer2)),
Box::new(move |e| match_sync_finished(e, peer2)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;

info!("peer1: wait for 2 events (join & accept sync finished from peer2)");
assert_next_unordered(
assert_next(
&mut events1,
TIMEOUT,
vec![
Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer2)),
Box::new(move |e| match_sync_finished(e, peer2)),
match_event!(LiveEvent::PendingContentReady),
],
)
.await;
Expand Down Expand Up @@ -559,9 +558,11 @@ async fn test_sync_via_relay() -> Result<()> {
Box::new(
move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == updated_hash),
),
],
vec![
Box::new(move |e| match_sync_finished(e, node1_id)),
Box::new(move |e| matches!(e, LiveEvent::PendingContentReady)),
],
vec![Box::new(move |e| match_sync_finished(e, node1_id))],
).await;
let actual = doc2
.get_exact(author1, b"foo", false)
Expand Down Expand Up @@ -674,10 +675,10 @@ async fn sync_restart_node() -> Result<()> {
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash_b),
match_event!(LiveEvent::PendingContentReady),
],
vec![
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::PendingContentReady),
]
).await;
assert_latest(&doc1, b"n2/b", b"b").await;
Expand All @@ -691,10 +692,10 @@ async fn sync_restart_node() -> Result<()> {
vec![
match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2),
match_event!(LiveEvent::ContentReady { hash } if *hash == hash_c),
match_event!(LiveEvent::PendingContentReady),
],
vec![
match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()),
match_event!(LiveEvent::PendingContentReady),
]
).await;

Expand Down Expand Up @@ -1207,6 +1208,36 @@ fn apply_matchers<T>(item: &T, matchers: &mut Vec<Box<dyn Fn(&T) -> bool + Send>
false
}

/// Receive the next `matchers.len()` elements from a stream and matches them against the functions
/// in `matchers`, in order.
///
/// Returns all received events.
#[allow(clippy::type_complexity)]
async fn assert_next<T: std::fmt::Debug + Clone>(
mut stream: impl Stream<Item = Result<T>> + Unpin + Send,
timeout: Duration,
matchers: Vec<Box<dyn Fn(&T) -> bool + Send>>,
) -> Vec<T> {
let fut = async {
let mut items = vec![];
for (i, f) in matchers.iter().enumerate() {
let item = stream
.next()
.await
.expect("event stream ended prematurely")
.expect("event stream errored");
if !(f)(&item) {
panic!("assertion failed for event {i} {item:?}");
}
items.push(item);
}
items
};
let res = tokio::time::timeout(timeout, fut).await;
let events = res.expect("timeout reached");
events
}

/// Receive `matchers.len()` elements from a stream and assert that each element matches one of the
/// functions in `matchers`.
///
Expand Down

0 comments on commit 4031c15

Please sign in to comment.