From d64446bd6ace07a46f89e706387b037cc688f703 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Fri, 22 Mar 2024 10:25:39 -0400 Subject: [PATCH] retries and fixes --- .../src/network/behaviours/direct_message.rs | 36 ++++++++++++------- .../network/behaviours/exponential_backoff.rs | 6 ++++ crates/libp2p-networking/src/network/def.rs | 18 +++------- crates/libp2p-networking/src/network/mod.rs | 1 - crates/libp2p-networking/src/network/node.rs | 22 +++++++++--- 5 files changed, 51 insertions(+), 32 deletions(-) diff --git a/crates/libp2p-networking/src/network/behaviours/direct_message.rs b/crates/libp2p-networking/src/network/behaviours/direct_message.rs index 1e972b35f5..df4502c578 100644 --- a/crates/libp2p-networking/src/network/behaviours/direct_message.rs +++ b/crates/libp2p-networking/src/network/behaviours/direct_message.rs @@ -1,17 +1,11 @@ -use std::{ - collections::{HashMap, VecDeque}, - task::Poll, -}; - -use libp2p::request_response::cbor::Behaviour; -use libp2p::{ - request_response::{Event, Message, OutboundRequestId, ResponseChannel}, - Multiaddr, -}; +use async_compatibility_layer::art::{async_sleep, async_spawn}; +use async_compatibility_layer::channel::UnboundedSender; +use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel}; use libp2p_identity::PeerId; +use std::collections::HashMap; use tracing::{error, info}; -use crate::network::NetworkEvent; +use crate::network::{ClientRequest, NetworkEvent}; use super::exponential_backoff::ExponentialBackoff; @@ -50,6 +44,7 @@ impl DMBehaviour { pub(crate) fn handle_dm_event( &mut self, event: Event, Vec>, + retry_tx: Option>, ) -> Option { match event { Event::InboundFailure { @@ -72,7 +67,24 @@ impl DMBehaviour { "outbound failure to send message to {:?} with error {:?}", peer, error ); - if let Some(mut req) = self.in_progress_rr.remove(&request_id) {} + if let Some(mut req) = self.in_progress_rr.remove(&request_id) { + if req.retry_count == 0 { + return None; + } + req.retry_count -= 1; + if let Some(retry_tx) = retry_tx { + async_spawn(async move { + async_sleep(req.backoff.next_timeout(false)).await; + let _ = retry_tx + .send(ClientRequest::DirectRequest { + pid: peer, + contents: req.data, + retry_count: req.retry_count, + }) + .await; + }); + } + } None } Event::Message { message, peer, .. } => match message { diff --git a/crates/libp2p-networking/src/network/behaviours/exponential_backoff.rs b/crates/libp2p-networking/src/network/behaviours/exponential_backoff.rs index 2091f2abb2..04c848035b 100644 --- a/crates/libp2p-networking/src/network/behaviours/exponential_backoff.rs +++ b/crates/libp2p-networking/src/network/behaviours/exponential_backoff.rs @@ -52,6 +52,12 @@ impl ExponentialBackoff { } } + /// Return the timeout duration and start the next timeout. + pub fn next_timeout(&mut self, result: bool) -> Duration { + let timeout = self.timeout; + self.start_next(result); + timeout + } /// Whether or not the timeout is expired #[must_use] pub fn is_expired(&self) -> bool { diff --git a/crates/libp2p-networking/src/network/def.rs b/crates/libp2p-networking/src/network/def.rs index b652d798be..6de82414cf 100644 --- a/crates/libp2p-networking/src/network/def.rs +++ b/crates/libp2p-networking/src/network/def.rs @@ -2,10 +2,7 @@ use futures::channel::oneshot::Sender; use libp2p::{ gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic}, identify::{Behaviour as IdentifyBehaviour, Event as IdentifyEvent}, - request_response::{ - cbor::{self, Behaviour}, - ResponseChannel, - }, + request_response::{cbor, OutboundRequestId, ResponseChannel}, Multiaddr, }; use libp2p_identity::PeerId; @@ -15,7 +12,6 @@ use tracing::{debug, error}; use super::{ behaviours::{ dht::{DHTBehaviour, DHTEvent, KadPutQuery}, - direct_message::{DMBehaviour, DMEvent, DMRequest}, exponential_backoff::ExponentialBackoff, request_response::{Request, Response}, }, @@ -147,19 +143,13 @@ impl NetworkDef { /// Request/response functions impl NetworkDef { /// Add a direct request for a given peer - pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec, retry_count: u8) { - let request = DMRequest { - peer_id, - data: data.clone(), - backoff: ExponentialBackoff::default(), - retry_count, - }; - let id = self.direct_message.send_request(&peer_id, data); + pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec) -> OutboundRequestId { + self.direct_message.send_request(&peer_id, data) } /// Add a direct response for a channel pub fn add_direct_response(&mut self, chan: ResponseChannel>, msg: Vec) { - self.direct_message.send_response(chan, msg); + let _ = self.direct_message.send_response(chan, msg); } } diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 0242deb9ce..1f0b836561 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -19,7 +19,6 @@ pub use self::{ use self::behaviours::{ dht::DHTEvent, - direct_message::DMEvent, request_response::{Request, Response}, }; use futures::channel::oneshot::{self, Sender}; diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index 32dd25f76a..008f42fbff 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -23,7 +23,7 @@ use super::{ use crate::network::behaviours::{ dht::{DHTBehaviour, DHTEvent, DHTProgress, KadPutQuery, NUM_REPLICATED_TO_TRUST}, - direct_message::{DMBehaviour, DMEvent}, + direct_message::{DMBehaviour, DMRequest}, exponential_backoff::ExponentialBackoff, request_response::{Request, RequestResponseState, Response}, }; @@ -89,6 +89,8 @@ pub struct NetworkNode { request_response_state: RequestResponseState, /// Handler for direct messages direct_message_state: DMBehaviour, + /// Channel to resend requests, set to Some when we call `spawn_listeners` + resend_tx: Option>, } impl NetworkNode { @@ -337,6 +339,7 @@ impl NetworkNode { listener_id: None, request_response_state: RequestResponseState::default(), direct_message_state: DMBehaviour::default(), + resend_tx: None, }) } @@ -436,7 +439,14 @@ impl NetworkNode { retry_count, } => { info!("pid {:?} adding direct request", self.peer_id); - behaviour.add_direct_request(pid, contents, retry_count); + let id = behaviour.add_direct_request(pid, contents.clone()); + let req = DMRequest { + peer_id: pid, + data: contents, + backoff: ExponentialBackoff::default(), + retry_count, + }; + self.direct_message_state.add_direct_request(req, id); } ClientRequest::DirectResponse(chan, msg) => { behaviour.add_direct_response(chan, msg); @@ -606,9 +616,9 @@ impl NetworkNode { None } }, - NetworkEventInternal::DMEvent(e) => { - self.direct_message_state.handle_dm_event(e) - } + NetworkEventInternal::DMEvent(e) => self + .direct_message_state + .handle_dm_event(e, self.resend_tx.clone()), NetworkEventInternal::RequestResponseEvent(e) => { self.request_response_state.handle_request_response(e) } @@ -668,6 +678,8 @@ impl NetworkNode { let (s_input, s_output) = unbounded::(); let (r_input, r_output) = unbounded::(); + self.resend_tx = Some(s_input.clone()); + async_spawn( async move { let mut fuse = s_output.recv().boxed().fuse();