From a26e7478affacab189a28be31f65e11cbb59b325 Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Tue, 17 Sep 2024 17:05:36 -0700 Subject: [PATCH] feat: keep searching content after transfer failure If a find content query is transferring content from a peer, and it fails, then resume pinging peers to get the content from another one. Ways we have seen the content transfer fail: - peer disappears during utp transfer - peer sends invalid content - peer implements utp differently For now, partially to simplify testing and architecture, we only validate one piece of content at a time. When a piece of content is returned, we prioritize starting validation. When validation has started, we pause all other activity: finding new peers, finding the content elsewhere, and validating more copies of already-found content. Those things resume only after the active validation fails. It could be nice to run validation on 2 or more copies of content at a time, but testing this would be much more complicated (at least with the current testing approach). --- portalnet/src/find/iterators/findcontent.rs | 215 +++++++++++++------- portalnet/src/overlay/service.rs | 10 +- 2 files changed, 151 insertions(+), 74 deletions(-) diff --git a/portalnet/src/find/iterators/findcontent.rs b/portalnet/src/find/iterators/findcontent.rs index 169332cfa..e94eea4ee 100644 --- a/portalnet/src/find/iterators/findcontent.rs +++ b/portalnet/src/find/iterators/findcontent.rs @@ -56,7 +56,7 @@ pub enum FindContentQueryPending { // peer that sent the content peer: TNodeId, // channel used to send back the validated content info - valid_content_tx: Sender>, + valid_content_tx: Sender>>, }, Utp { connection_id: u16, @@ -64,7 +64,7 @@ pub enum FindContentQueryPending { // peer used to create utp stream peer: TNodeId, // channel used to send back the validated content info - valid_content_tx: Sender>, + valid_content_tx: Sender>>, }, } @@ -114,8 +114,8 @@ pub struct FindContentQuery { pending_validations: VecDeque, /// A channel to receive the final content after validation - content_rx: Receiver>, - content_tx: Sender>, + content_rx: Receiver>>, + content_tx: Sender>>, /// The received content, after validation. validated_content: Option>, @@ -268,9 +268,14 @@ where // If the content for this query has been marked as valid, then exit if let Ok(valid_content) = self.content_rx.try_recv() { - self.progress = QueryProgress::Finished; - self.validated_content = Some(valid_content); - return QueryState::Finished; + if let Some(valid_content) = valid_content { + self.progress = QueryProgress::Finished; + self.validated_content = Some(valid_content); + return QueryState::Finished; + } else { + // The content was marked as invalid. Continue the query. + self.num_validating -= 1; + } } if self.num_validating > 0 { @@ -321,10 +326,10 @@ where } } - QueryPeerState::Succeeded => {} - - QueryPeerState::Failed | QueryPeerState::Unresponsive => { - // Skip over unresponsive or failed peers. + QueryPeerState::Succeeded + | QueryPeerState::Failed + | QueryPeerState::Unresponsive => { + // These are all terminal conditions for a peer, so there is no more work to do } } } @@ -567,88 +572,157 @@ mod tests { let max_parallelism = usize::min(query.config.parallelism, remaining.len()); let target = query.target_key.clone(); - let mut expected: Vec<_>; + trace!("***** starting termination_and_parallelism test with target {target:?} *****"); + // This tracks peers that have returned content, and are waiting to be polled + let mut new_validations: VecDeque> = Default::default(); + // This tracks peers that have returned content, have been polled out of the query, + // but validation status is unknown + let mut validating: Vec<_> = vec![]; let found_content: Vec = vec![0xef]; let mut content_peer = None; 'finished: loop { - if remaining.is_empty() { + let query_states = if !validating.is_empty() { + // Verify that we don't poll peers while actively validating content + match query.poll(now) { + QueryState::WaitingAtCapacity => {} + QueryState::Finished => {} + state => panic!("Expected to pause peer polling while validating, got {state:?}. Still validating these peers: {validating:?}"), + } + vec![] + } else if let Some(k) = new_validations.pop_front() { + let query_state = query.poll(now); + match query_state { + QueryState::Validating(p) => { + trace!("peer {k:?} needs to have content verified"); + assert_eq!(&p, k.preimage()); + // For now, we only do one validation at a time + assert_eq!(query.poll(now), QueryState::WaitingAtCapacity); + vec![query_state] + } + QueryState::Finished => vec![], + actual => panic!("Expected validation state, got {actual:?}"), + } + } else if remaining.is_empty() { trace!("ending test: no more peers to pull from"); break; } else { // Split off the next (up to) `parallelism` peers, who we expect to poll. let num_expected = min(max_parallelism, remaining.len()); - expected = remaining.drain(..num_expected).collect(); - } - // Advance the query for maximum parallelism. - for k in expected.iter() { - match query.poll(now) { - QueryState::Finished => { - trace!("Ending test loop: query state is finished"); - break 'finished; - } - QueryState::Waiting(Some(p)) => assert_eq!(&p, k.preimage()), - QueryState::Waiting(None) => panic!("Expected another peer."), - QueryState::WaitingAtCapacity => panic!("Unexpectedly reached capacity."), - QueryState::Validating(p) => assert_eq!(&p, k.preimage()), - } + // Advance the query for maximum parallelism. + remaining + .drain(..num_expected) + .map_while(|k| { + let query_state = query.poll(now); + match query_state { + QueryState::Finished => None, + QueryState::Waiting(Some(p)) => { + assert_eq!(&p, k.preimage()); + Some(query_state) + } + QueryState::Waiting(None) => panic!("Expected another peer."), + QueryState::WaitingAtCapacity => { + panic!("Should not go over capacity") + } + QueryState::Validating(_) => { + panic!("Didn't expect new validations.") + } + } + }) + .collect::>() + }; + + if query.progress == QueryProgress::Finished { + trace!("Ending test loop: query state is finished"); + break 'finished; } - let num_waiting = query.num_waiting; - assert_eq!(num_waiting, expected.len()); + let num_waiting = query_states.len(); // Check the bounded parallelism. if query.at_capacity() { assert_eq!(query.poll(now), QueryState::WaitingAtCapacity) } - for (i, k) in expected.iter().enumerate() { - if rng.gen_bool(0.75) { - // With a small probability, return the desired content. Otherwise, return - // a list of random "closer" peers. - if rng.gen_bool(0.05) { - let peer_node_id = k.preimage(); - query.on_success( - peer_node_id, - FindContentQueryResponse::Content(found_content.clone()), - ); - // The first peer to return the content should be the one reported at - // the end. - if content_peer.is_none() { - content_peer = Some(k.clone()); + for (i, state) in query_states.iter().enumerate() { + match state { + QueryState::Waiting(Some(p)) => { + let k = Key::from(*p); + if rng.gen_bool(0.75) { + // With a small probability, return the desired content. Otherwise, + // return a list of random "closer" peers. + if rng.gen_bool(0.05) { + trace!("peer {k:?} returned content"); + let peer_node_id = k.preimage(); + query.on_success( + peer_node_id, + FindContentQueryResponse::Content(found_content.clone()), + ); + // The peer that returned content is now validating. + new_validations.push_back(k); + } else { + let num_closer = rng.gen_range(0..query.config.num_results + 1); + let closer_peers = random_nodes(num_closer).collect::>(); + remaining.extend(closer_peers.iter().map(|x| Key::from(*x))); + query.on_success( + k.preimage(), + FindContentQueryResponse::ClosestNodes(closer_peers), + ); + } + } else { + query.on_failure(k.preimage()); } - // Immediately validate the content - match query.pending_validation_result(*peer_node_id) { - FindContentQueryPending::PendingContent { + } + QueryState::Validating(p) => { + let k = Key::from(*p); + trace!("peer {k:?} is queued for validation"); + // Start simulated validation process + validating.push(k); + } + _ => panic!("Unexpected query state: {state:?}"), + } + assert_eq!(query.num_waiting, num_waiting - (i + 1)); + } + + validating.retain(|k| { + if rng.gen_bool(0.3) { + // Mark pending content as valid or not + let node_id = k.preimage(); + match query.pending_validation_result(*node_id) { + FindContentQueryPending::PendingContent { + content, + nodes_to_poke: _, + peer, + valid_content_tx, + } => { + let validated_content = ValidatedContent { content, - nodes_to_poke: _, - peer, - valid_content_tx, - } => { - let validated_content = ValidatedContent { - content, - was_utp_transfer: false, - sending_peer: peer, - }; - valid_content_tx.send(validated_content).unwrap(); + was_utp_transfer: false, + sending_peer: peer, + }; + if rng.gen_bool(0.7) { + trace!("peer {k:?} content is valid"); + // Track which peer is first to return valid content. + // That should be the final reported peer. + if content_peer.is_none() { + content_peer = Some(k.clone()); + } + valid_content_tx.send(Some(validated_content)).unwrap(); + } else { + trace!("peer {k:?} content is invalid"); + valid_content_tx.send(None).unwrap(); } - result => panic!("Unexpected result: {result:?}"), } - } else { - let num_closer = rng.gen_range(0..query.config.num_results + 1); - let closer_peers = random_nodes(num_closer).collect::>(); - remaining.extend(closer_peers.iter().map(|x| Key::from(*x))); - query.on_success( - k.preimage(), - FindContentQueryResponse::ClosestNodes(closer_peers), - ); + result => panic!("Unexpected result: {result:?}"), } + false } else { - query.on_failure(k.preimage()); + // Keep the peer key, to try validating later + trace!("peer {k:?} content is still pending validation"); + true } - assert_eq!(query.num_waiting, num_waiting - (i + 1)); - } + }); // Re-sort the remaining expected peers for the next "round". remaining.sort_by_key(|k| target.distance(k)); @@ -672,8 +746,7 @@ mod tests { }) .collect(); - let result = query.into_result(); - match result { + match query.into_result() { FindContentQueryResult::ValidContent(validated_content, _cancelled_peers) => { assert_eq!(validated_content.content, found_content); @@ -691,7 +764,7 @@ mod tests { } } - QuickCheck::new().tests(100).quickcheck(prop as fn(_) -> _) + QuickCheck::new().tests(300).quickcheck(prop as fn(_) -> _) } #[test] diff --git a/portalnet/src/overlay/service.rs b/portalnet/src/overlay/service.rs index 308646582..e582d5334 100644 --- a/portalnet/src/overlay/service.rs +++ b/portalnet/src/overlay/service.rs @@ -707,6 +707,8 @@ where %e, "Failed to connect to inbound uTP stream for FindContent" ); + // Indicate to the query that the content is invalid + let _ = valid_content_tx.send(None); return; } }; @@ -1873,7 +1875,7 @@ where nodes_to_poke: Vec, utp_processing: UtpProcessing, sending_peer: NodeId, - valid_content_callback: Sender>, + valid_content_callback: Sender>>, ) { let mut content = content; // Operate under assumption that all content in the store is valid @@ -1900,6 +1902,8 @@ where content.key = %content_key, "Error validating content" ); + // Indicate to the query that the content is invalid + let _ = valid_content_callback.send(None); return; } }; @@ -1956,11 +1960,11 @@ where } if valid_content_callback - .send(ValidatedContent { + .send(Some(ValidatedContent { content: content.clone(), was_utp_transfer: utp_transfer, sending_peer, - }) + })) .is_err() { warn!("The content query has exited before the returned content could be marked as valid. Perhaps a timeout, or a parallel copy of the content was validated first.");