Skip to content

Commit

Permalink
retries and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bfish713 committed Mar 22, 2024
1 parent b8cbf7d commit d64446b
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 32 deletions.
36 changes: 24 additions & 12 deletions crates/libp2p-networking/src/network/behaviours/direct_message.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use std::{
collections::{HashMap, VecDeque},
task::Poll,
};

use libp2p::request_response::cbor::Behaviour;
use libp2p::{
request_response::{Event, Message, OutboundRequestId, ResponseChannel},
Multiaddr,
};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_compatibility_layer::channel::UnboundedSender;
use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
use libp2p_identity::PeerId;
use std::collections::HashMap;
use tracing::{error, info};

use crate::network::NetworkEvent;
use crate::network::{ClientRequest, NetworkEvent};

use super::exponential_backoff::ExponentialBackoff;

Expand Down Expand Up @@ -50,6 +44,7 @@ impl DMBehaviour {
pub(crate) fn handle_dm_event(
&mut self,
event: Event<Vec<u8>, Vec<u8>>,
retry_tx: Option<UnboundedSender<ClientRequest>>,
) -> Option<NetworkEvent> {
match event {
Event::InboundFailure {
Expand All @@ -72,7 +67,24 @@ impl DMBehaviour {
"outbound failure to send message to {:?} with error {:?}",
peer, error
);
if let Some(mut req) = self.in_progress_rr.remove(&request_id) {}
if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
if req.retry_count == 0 {
return None;
}
req.retry_count -= 1;
if let Some(retry_tx) = retry_tx {
async_spawn(async move {
async_sleep(req.backoff.next_timeout(false)).await;
let _ = retry_tx
.send(ClientRequest::DirectRequest {
pid: peer,
contents: req.data,
retry_count: req.retry_count,
})
.await;
});
}
}
None
}
Event::Message { message, peer, .. } => match message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ impl ExponentialBackoff {
}
}

/// Return the timeout duration and start the next timeout.
pub fn next_timeout(&mut self, result: bool) -> Duration {
let timeout = self.timeout;
self.start_next(result);
timeout
}
/// Whether or not the timeout is expired
#[must_use]
pub fn is_expired(&self) -> bool {
Expand Down
18 changes: 4 additions & 14 deletions crates/libp2p-networking/src/network/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use futures::channel::oneshot::Sender;
use libp2p::{
gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic},
identify::{Behaviour as IdentifyBehaviour, Event as IdentifyEvent},
request_response::{
cbor::{self, Behaviour},
ResponseChannel,
},
request_response::{cbor, OutboundRequestId, ResponseChannel},
Multiaddr,
};
use libp2p_identity::PeerId;
Expand All @@ -15,7 +12,6 @@ use tracing::{debug, error};
use super::{
behaviours::{
dht::{DHTBehaviour, DHTEvent, KadPutQuery},
direct_message::{DMBehaviour, DMEvent, DMRequest},
exponential_backoff::ExponentialBackoff,
request_response::{Request, Response},
},
Expand Down Expand Up @@ -147,19 +143,13 @@ impl NetworkDef {
/// Request/response functions
impl NetworkDef {
/// Add a direct request for a given peer
pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec<u8>, retry_count: u8) {
let request = DMRequest {
peer_id,
data: data.clone(),
backoff: ExponentialBackoff::default(),
retry_count,
};
let id = self.direct_message.send_request(&peer_id, data);
pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec<u8>) -> OutboundRequestId {
self.direct_message.send_request(&peer_id, data)
}

/// Add a direct response for a channel
pub fn add_direct_response(&mut self, chan: ResponseChannel<Vec<u8>>, msg: Vec<u8>) {
self.direct_message.send_response(chan, msg);
let _ = self.direct_message.send_response(chan, msg);
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/libp2p-networking/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub use self::{

use self::behaviours::{
dht::DHTEvent,
direct_message::DMEvent,
request_response::{Request, Response},
};
use futures::channel::oneshot::{self, Sender};
Expand Down
22 changes: 17 additions & 5 deletions crates/libp2p-networking/src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{

use crate::network::behaviours::{
dht::{DHTBehaviour, DHTEvent, DHTProgress, KadPutQuery, NUM_REPLICATED_TO_TRUST},
direct_message::{DMBehaviour, DMEvent},
direct_message::{DMBehaviour, DMRequest},
exponential_backoff::ExponentialBackoff,
request_response::{Request, RequestResponseState, Response},
};
Expand Down Expand Up @@ -89,6 +89,8 @@ pub struct NetworkNode {
request_response_state: RequestResponseState,
/// Handler for direct messages
direct_message_state: DMBehaviour,
/// Channel to resend requests, set to Some when we call `spawn_listeners`
resend_tx: Option<UnboundedSender<ClientRequest>>,
}

impl NetworkNode {
Expand Down Expand Up @@ -337,6 +339,7 @@ impl NetworkNode {
listener_id: None,
request_response_state: RequestResponseState::default(),
direct_message_state: DMBehaviour::default(),
resend_tx: None,
})
}

Expand Down Expand Up @@ -436,7 +439,14 @@ impl NetworkNode {
retry_count,
} => {
info!("pid {:?} adding direct request", self.peer_id);
behaviour.add_direct_request(pid, contents, retry_count);
let id = behaviour.add_direct_request(pid, contents.clone());
let req = DMRequest {
peer_id: pid,
data: contents,
backoff: ExponentialBackoff::default(),
retry_count,
};
self.direct_message_state.add_direct_request(req, id);
}
ClientRequest::DirectResponse(chan, msg) => {
behaviour.add_direct_response(chan, msg);
Expand Down Expand Up @@ -606,9 +616,9 @@ impl NetworkNode {
None
}
},
NetworkEventInternal::DMEvent(e) => {
self.direct_message_state.handle_dm_event(e)
}
NetworkEventInternal::DMEvent(e) => self
.direct_message_state
.handle_dm_event(e, self.resend_tx.clone()),
NetworkEventInternal::RequestResponseEvent(e) => {
self.request_response_state.handle_request_response(e)
}
Expand Down Expand Up @@ -668,6 +678,8 @@ impl NetworkNode {
let (s_input, s_output) = unbounded::<ClientRequest>();
let (r_input, r_output) = unbounded::<NetworkEvent>();

self.resend_tx = Some(s_input.clone());

async_spawn(
async move {
let mut fuse = s_output.recv().boxed().fuse();
Expand Down

0 comments on commit d64446b

Please sign in to comment.