Skip to content

Commit

Permalink
feat: keep searching content after transfer failure
Browse files Browse the repository at this point in the history
If a find content query is transferring content from a peer, and it
fails, then resume pinging peers to get the content from another one.

Ways we have seen the content transfer fail:
- peer disappears during utp transfer
- peer sends invalid content
- peer implements utp differently

For now, partially to simplify testing and architecture, we only
validate one piece of content at a time. When a piece of content is
returned, we prioritize starting validation. When validation has
started, we pause all other activity: finding new peers, finding the
content elsewhere, and validating more copies of already-found content.
Those things resume only after the active validation fails.

It could be nice to run validation on 2 or more copies of content at a
time, but testing this would be much more complicated (at least with the
current testing approach).
  • Loading branch information
carver committed Sep 23, 2024
1 parent c238aca commit a26e747
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 74 deletions.
215 changes: 144 additions & 71 deletions portalnet/src/find/iterators/findcontent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ pub enum FindContentQueryPending<TNodeId> {
// peer that sent the content
peer: TNodeId,
// channel used to send back the validated content info
valid_content_tx: Sender<ValidatedContent<TNodeId>>,
valid_content_tx: Sender<Option<ValidatedContent<TNodeId>>>,
},
Utp {
connection_id: u16,
nodes_to_poke: Vec<TNodeId>,
// peer used to create utp stream
peer: TNodeId,
// channel used to send back the validated content info
valid_content_tx: Sender<ValidatedContent<TNodeId>>,
valid_content_tx: Sender<Option<ValidatedContent<TNodeId>>>,
},
}

Expand Down Expand Up @@ -114,8 +114,8 @@ pub struct FindContentQuery<TNodeId: std::fmt::Display + std::hash::Hash> {
pending_validations: VecDeque<TNodeId>,

/// A channel to receive the final content after validation
content_rx: Receiver<ValidatedContent<TNodeId>>,
content_tx: Sender<ValidatedContent<TNodeId>>,
content_rx: Receiver<Option<ValidatedContent<TNodeId>>>,
content_tx: Sender<Option<ValidatedContent<TNodeId>>>,

/// The received content, after validation.
validated_content: Option<ValidatedContent<TNodeId>>,
Expand Down Expand Up @@ -268,9 +268,14 @@ where

// If the content for this query has been marked as valid, then exit
if let Ok(valid_content) = self.content_rx.try_recv() {
self.progress = QueryProgress::Finished;
self.validated_content = Some(valid_content);
return QueryState::Finished;
if let Some(valid_content) = valid_content {
self.progress = QueryProgress::Finished;
self.validated_content = Some(valid_content);
return QueryState::Finished;
} else {
// The content was marked as invalid. Continue the query.
self.num_validating -= 1;
}
}

if self.num_validating > 0 {
Expand Down Expand Up @@ -321,10 +326,10 @@ where
}
}

QueryPeerState::Succeeded => {}

QueryPeerState::Failed | QueryPeerState::Unresponsive => {
// Skip over unresponsive or failed peers.
QueryPeerState::Succeeded
| QueryPeerState::Failed
| QueryPeerState::Unresponsive => {
// These are all terminal conditions for a peer, so there is no more work to do
}
}
}
Expand Down Expand Up @@ -567,88 +572,157 @@ mod tests {
let max_parallelism = usize::min(query.config.parallelism, remaining.len());

let target = query.target_key.clone();
let mut expected: Vec<_>;
trace!("***** starting termination_and_parallelism test with target {target:?} *****");
// This tracks peers that have returned content, and are waiting to be polled
let mut new_validations: VecDeque<Key<NodeId>> = Default::default();
// This tracks peers that have returned content, have been polled out of the query,
// but validation status is unknown
let mut validating: Vec<_> = vec![];

let found_content: Vec<u8> = vec![0xef];
let mut content_peer = None;

'finished: loop {
if remaining.is_empty() {
let query_states = if !validating.is_empty() {
// Verify that we don't poll peers while actively validating content
match query.poll(now) {
QueryState::WaitingAtCapacity => {}
QueryState::Finished => {}
state => panic!("Expected to pause peer polling while validating, got {state:?}. Still validating these peers: {validating:?}"),
}
vec![]
} else if let Some(k) = new_validations.pop_front() {
let query_state = query.poll(now);
match query_state {
QueryState::Validating(p) => {
trace!("peer {k:?} needs to have content verified");
assert_eq!(&p, k.preimage());
// For now, we only do one validation at a time
assert_eq!(query.poll(now), QueryState::WaitingAtCapacity);
vec![query_state]
}
QueryState::Finished => vec![],
actual => panic!("Expected validation state, got {actual:?}"),
}
} else if remaining.is_empty() {
trace!("ending test: no more peers to pull from");
break;
} else {
// Split off the next (up to) `parallelism` peers, who we expect to poll.
let num_expected = min(max_parallelism, remaining.len());
expected = remaining.drain(..num_expected).collect();
}

// Advance the query for maximum parallelism.
for k in expected.iter() {
match query.poll(now) {
QueryState::Finished => {
trace!("Ending test loop: query state is finished");
break 'finished;
}
QueryState::Waiting(Some(p)) => assert_eq!(&p, k.preimage()),
QueryState::Waiting(None) => panic!("Expected another peer."),
QueryState::WaitingAtCapacity => panic!("Unexpectedly reached capacity."),
QueryState::Validating(p) => assert_eq!(&p, k.preimage()),
}
// Advance the query for maximum parallelism.
remaining
.drain(..num_expected)
.map_while(|k| {
let query_state = query.poll(now);
match query_state {
QueryState::Finished => None,
QueryState::Waiting(Some(p)) => {
assert_eq!(&p, k.preimage());
Some(query_state)
}
QueryState::Waiting(None) => panic!("Expected another peer."),
QueryState::WaitingAtCapacity => {
panic!("Should not go over capacity")
}
QueryState::Validating(_) => {
panic!("Didn't expect new validations.")
}
}
})
.collect::<Vec<_>>()
};

if query.progress == QueryProgress::Finished {
trace!("Ending test loop: query state is finished");
break 'finished;
}
let num_waiting = query.num_waiting;
assert_eq!(num_waiting, expected.len());
let num_waiting = query_states.len();

// Check the bounded parallelism.
if query.at_capacity() {
assert_eq!(query.poll(now), QueryState::WaitingAtCapacity)
}

for (i, k) in expected.iter().enumerate() {
if rng.gen_bool(0.75) {
// With a small probability, return the desired content. Otherwise, return
// a list of random "closer" peers.
if rng.gen_bool(0.05) {
let peer_node_id = k.preimage();
query.on_success(
peer_node_id,
FindContentQueryResponse::Content(found_content.clone()),
);
// The first peer to return the content should be the one reported at
// the end.
if content_peer.is_none() {
content_peer = Some(k.clone());
for (i, state) in query_states.iter().enumerate() {
match state {
QueryState::Waiting(Some(p)) => {
let k = Key::from(*p);
if rng.gen_bool(0.75) {
// With a small probability, return the desired content. Otherwise,
// return a list of random "closer" peers.
if rng.gen_bool(0.05) {
trace!("peer {k:?} returned content");
let peer_node_id = k.preimage();
query.on_success(
peer_node_id,
FindContentQueryResponse::Content(found_content.clone()),
);
// The peer that returned content is now validating.
new_validations.push_back(k);
} else {
let num_closer = rng.gen_range(0..query.config.num_results + 1);
let closer_peers = random_nodes(num_closer).collect::<Vec<_>>();
remaining.extend(closer_peers.iter().map(|x| Key::from(*x)));
query.on_success(
k.preimage(),
FindContentQueryResponse::ClosestNodes(closer_peers),
);
}
} else {
query.on_failure(k.preimage());
}
// Immediately validate the content
match query.pending_validation_result(*peer_node_id) {
FindContentQueryPending::PendingContent {
}
QueryState::Validating(p) => {
let k = Key::from(*p);
trace!("peer {k:?} is queued for validation");
// Start simulated validation process
validating.push(k);
}
_ => panic!("Unexpected query state: {state:?}"),
}
assert_eq!(query.num_waiting, num_waiting - (i + 1));
}

validating.retain(|k| {
if rng.gen_bool(0.3) {
// Mark pending content as valid or not
let node_id = k.preimage();
match query.pending_validation_result(*node_id) {
FindContentQueryPending::PendingContent {
content,
nodes_to_poke: _,
peer,
valid_content_tx,
} => {
let validated_content = ValidatedContent {
content,
nodes_to_poke: _,
peer,
valid_content_tx,
} => {
let validated_content = ValidatedContent {
content,
was_utp_transfer: false,
sending_peer: peer,
};
valid_content_tx.send(validated_content).unwrap();
was_utp_transfer: false,
sending_peer: peer,
};
if rng.gen_bool(0.7) {
trace!("peer {k:?} content is valid");
// Track which peer is first to return valid content.
// That should be the final reported peer.
if content_peer.is_none() {
content_peer = Some(k.clone());
}
valid_content_tx.send(Some(validated_content)).unwrap();
} else {
trace!("peer {k:?} content is invalid");
valid_content_tx.send(None).unwrap();
}
result => panic!("Unexpected result: {result:?}"),
}
} else {
let num_closer = rng.gen_range(0..query.config.num_results + 1);
let closer_peers = random_nodes(num_closer).collect::<Vec<_>>();
remaining.extend(closer_peers.iter().map(|x| Key::from(*x)));
query.on_success(
k.preimage(),
FindContentQueryResponse::ClosestNodes(closer_peers),
);
result => panic!("Unexpected result: {result:?}"),
}
false
} else {
query.on_failure(k.preimage());
// Keep the peer key, to try validating later
trace!("peer {k:?} content is still pending validation");
true
}
assert_eq!(query.num_waiting, num_waiting - (i + 1));
}
});

// Re-sort the remaining expected peers for the next "round".
remaining.sort_by_key(|k| target.distance(k));
Expand All @@ -672,8 +746,7 @@ mod tests {
})
.collect();

let result = query.into_result();
match result {
match query.into_result() {
FindContentQueryResult::ValidContent(validated_content, _cancelled_peers) => {
assert_eq!(validated_content.content, found_content);

Expand All @@ -691,7 +764,7 @@ mod tests {
}
}

QuickCheck::new().tests(100).quickcheck(prop as fn(_) -> _)
QuickCheck::new().tests(300).quickcheck(prop as fn(_) -> _)
}

#[test]
Expand Down
10 changes: 7 additions & 3 deletions portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,8 @@ where
%e,
"Failed to connect to inbound uTP stream for FindContent"
);
// Indicate to the query that the content is invalid
let _ = valid_content_tx.send(None);
return;
}
};
Expand Down Expand Up @@ -1873,7 +1875,7 @@ where
nodes_to_poke: Vec<NodeId>,
utp_processing: UtpProcessing<TValidator, TStore, TContentKey>,
sending_peer: NodeId,
valid_content_callback: Sender<ValidatedContent<NodeId>>,
valid_content_callback: Sender<Option<ValidatedContent<NodeId>>>,
) {
let mut content = content;
// Operate under assumption that all content in the store is valid
Expand All @@ -1900,6 +1902,8 @@ where
content.key = %content_key,
"Error validating content"
);
// Indicate to the query that the content is invalid
let _ = valid_content_callback.send(None);
return;
}
};
Expand Down Expand Up @@ -1956,11 +1960,11 @@ where
}

if valid_content_callback
.send(ValidatedContent {
.send(Some(ValidatedContent {
content: content.clone(),
was_utp_transfer: utp_transfer,
sending_peer,
})
}))
.is_err()
{
warn!("The content query has exited before the returned content could be marked as valid. Perhaps a timeout, or a parallel copy of the content was validated first.");
Expand Down

0 comments on commit a26e747

Please sign in to comment.