From b8cbf7da1b9529282f0f0bcee69370b73c48be70 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Thu, 21 Mar 2024 23:57:33 -0400 Subject: [PATCH] Removing behaviour impl for DM --- .../src/network/behaviours/direct_message.rs | 208 ++---------------- crates/libp2p-networking/src/network/def.rs | 26 ++- crates/libp2p-networking/src/network/mod.rs | 2 +- crates/libp2p-networking/src/network/node.rs | 16 +- 4 files changed, 38 insertions(+), 214 deletions(-) diff --git a/crates/libp2p-networking/src/network/behaviours/direct_message.rs b/crates/libp2p-networking/src/network/behaviours/direct_message.rs index f156e2ad96..1e972b35f5 100644 --- a/crates/libp2p-networking/src/network/behaviours/direct_message.rs +++ b/crates/libp2p-networking/src/network/behaviours/direct_message.rs @@ -6,15 +6,17 @@ use std::{ use libp2p::request_response::cbor::Behaviour; use libp2p::{ request_response::{Event, Message, OutboundRequestId, ResponseChannel}, - swarm::{NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm}, Multiaddr, }; use libp2p_identity::PeerId; use tracing::{error, info}; +use crate::network::NetworkEvent; + use super::exponential_backoff::ExponentialBackoff; /// Request to direct message a peert +#[derive(Debug)] pub struct DMRequest { /// the recv-ers peer id pub peer_id: PeerId, @@ -28,15 +30,10 @@ pub struct DMRequest { /// Wrapper metadata around libp2p's request response /// usage: direct message peer +#[derive(Debug, Default)] pub struct DMBehaviour { - /// The wrapped behaviour - request_response: Behaviour, Vec>, /// In progress queries in_progress_rr: HashMap, - /// Failed queries to be retried - failed_rr: VecDeque, - /// lsit of out events for parent behaviour - out_event_queue: Vec, } /// Lilst of direct message output events @@ -50,7 +47,10 @@ pub enum DMEvent { impl DMBehaviour { /// handle a direct message event - fn handle_dm_event(&mut self, event: Event, Vec>) { + pub(crate) fn handle_dm_event( + &mut self, + event: Event, Vec>, + ) -> Option { match event { Event::InboundFailure { peer, @@ -61,6 +61,7 @@ impl DMBehaviour { "inbound failure to send message to {:?} with error {:?}", peer, error ); + None } Event::OutboundFailure { peer, @@ -71,10 +72,8 @@ impl DMBehaviour { "outbound failure to send message to {:?} with error {:?}", peer, error ); - if let Some(mut req) = self.in_progress_rr.remove(&request_id) { - req.backoff.start_next(false); - self.failed_rr.push_back(req); - } + if let Some(mut req) = self.in_progress_rr.remove(&request_id) {} + None } Event::Message { message, peer, .. } => match message { Message::Request { @@ -85,8 +84,7 @@ impl DMBehaviour { info!("recv-ed DIRECT REQUEST {:?}", msg); // receiver, not initiator. // don't track. If we are disconnected, sender will reinitiate - self.out_event_queue - .push(DMEvent::DirectRequest(msg, peer, channel)); + Some(NetworkEvent::DirectRequest(msg, peer, channel)) } Message::Response { request_id, @@ -95,206 +93,32 @@ impl DMBehaviour { // success, finished. if let Some(req) = self.in_progress_rr.remove(&request_id) { info!("recv-ed DIRECT RESPONSE {:?}", msg); - self.out_event_queue - .push(DMEvent::DirectResponse(msg, req.peer_id)); + Some(NetworkEvent::DirectResponse(msg, req.peer_id)) } else { error!("recv-ed a direct response, but is no longer tracking message!"); + None } } }, e @ Event::ResponseSent { .. } => { info!(?e, " sending response"); + None } } } } -impl NetworkBehaviour for DMBehaviour { - type ConnectionHandler = , Vec> as NetworkBehaviour>::ConnectionHandler; - - type ToSwarm = DMEvent; - - fn on_swarm_event(&mut self, event: libp2p::swarm::derive_prelude::FromSwarm<'_>) { - self.request_response.on_swarm_event(event); - } - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - connection_id: libp2p::swarm::derive_prelude::ConnectionId, - event: THandlerOutEvent, - ) { - self.request_response - .on_connection_handler_event(peer_id, connection_id, event); - } - - fn poll( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll>> { - let mut retry_req_indices = Vec::new(); - for (idx, req) in self.failed_rr.iter().enumerate() { - if req.backoff.is_expired() { - retry_req_indices.push(idx); - } - } - let _ = retry_req_indices.into_iter().map(|idx| { - let req = self.failed_rr.remove(idx).unwrap(); - self.add_direct_request(req); - }); - while let Poll::Ready(ready) = NetworkBehaviour::poll(&mut self.request_response, cx) { - match ready { - // NOTE: this generates request - ToSwarm::GenerateEvent(e) => { - self.handle_dm_event(e); - } - ToSwarm::Dial { opts } => { - return Poll::Ready(ToSwarm::Dial { opts }); - } - ToSwarm::NotifyHandler { - peer_id, - handler, - event, - } => { - return Poll::Ready(ToSwarm::NotifyHandler { - peer_id, - handler, - event, - }); - } - ToSwarm::CloseConnection { - peer_id, - connection, - } => { - return Poll::Ready(ToSwarm::CloseConnection { - peer_id, - connection, - }); - } - ToSwarm::ListenOn { opts } => { - return Poll::Ready(ToSwarm::ListenOn { opts }); - } - ToSwarm::RemoveListener { id } => { - return Poll::Ready(ToSwarm::RemoveListener { id }); - } - ToSwarm::NewExternalAddrCandidate(c) => { - return Poll::Ready(ToSwarm::NewExternalAddrCandidate(c)); - } - ToSwarm::ExternalAddrConfirmed(c) => { - return Poll::Ready(ToSwarm::ExternalAddrConfirmed(c)); - } - ToSwarm::ExternalAddrExpired(c) => { - return Poll::Ready(ToSwarm::ExternalAddrExpired(c)); - } - e => { - error!("UNHANDLED NEW SWARM VARIANT: {e:?}"); - } - } - } - if !self.out_event_queue.is_empty() { - return Poll::Ready(ToSwarm::GenerateEvent(self.out_event_queue.remove(0))); - } - Poll::Pending - } - - fn handle_pending_inbound_connection( - &mut self, - connection_id: libp2p::swarm::ConnectionId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result<(), libp2p::swarm::ConnectionDenied> { - self.request_response.handle_pending_inbound_connection( - connection_id, - local_addr, - remote_addr, - ) - } - - fn handle_established_inbound_connection( - &mut self, - connection_id: libp2p::swarm::ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, libp2p::swarm::ConnectionDenied> { - self.request_response.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) - } - - fn handle_pending_outbound_connection( - &mut self, - connection_id: libp2p::swarm::ConnectionId, - maybe_peer: Option, - addresses: &[Multiaddr], - effective_role: libp2p::core::Endpoint, - ) -> Result, libp2p::swarm::ConnectionDenied> { - self.request_response.handle_pending_outbound_connection( - connection_id, - maybe_peer, - addresses, - effective_role, - ) - } - - fn handle_established_outbound_connection( - &mut self, - connection_id: libp2p::swarm::ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: libp2p::core::Endpoint, - ) -> Result, libp2p::swarm::ConnectionDenied> { - self.request_response - .handle_established_outbound_connection(connection_id, peer, addr, role_override) - } -} - impl DMBehaviour { - /// Create new behaviour based on request response - #[must_use] - pub fn new(request_response: Behaviour, Vec>) -> Self { - Self { - request_response, - in_progress_rr: HashMap::default(), - failed_rr: VecDeque::default(), - out_event_queue: Vec::default(), - } - } - - /// Add address to request response behaviour - pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - self.request_response.add_address(peer_id, address); - } - - /// Remove address from request response behaviour - pub fn remove_address(&mut self, peer_id: &PeerId, address: &Multiaddr) { - self.request_response.remove_address(peer_id, address); - } - /// Add a direct request for a given peer - pub fn add_direct_request(&mut self, mut req: DMRequest) { + pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) { if req.retry_count == 0 { return; } req.retry_count -= 1; - let request_id = self - .request_response - .send_request(&req.peer_id, req.data.clone()); info!("direct message request with id {:?}", request_id); self.in_progress_rr.insert(request_id, req); } - - /// Add a direct response for a channel - pub fn add_direct_response(&mut self, chan: ResponseChannel>, msg: Vec) { - let res = self.request_response.send_response(chan, msg); - if let Err(e) = res { - error!("Error replying to direct message. {:?}", e); - } - } } diff --git a/crates/libp2p-networking/src/network/def.rs b/crates/libp2p-networking/src/network/def.rs index 9ae2f70d75..b652d798be 100644 --- a/crates/libp2p-networking/src/network/def.rs +++ b/crates/libp2p-networking/src/network/def.rs @@ -2,7 +2,10 @@ use futures::channel::oneshot::Sender; use libp2p::{ gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic}, identify::{Behaviour as IdentifyBehaviour, Event as IdentifyEvent}, - request_response::{cbor, ResponseChannel}, + request_response::{ + cbor::{self, Behaviour}, + ResponseChannel, + }, Multiaddr, }; use libp2p_identity::PeerId; @@ -47,7 +50,7 @@ pub struct NetworkDef { /// purpose: directly messaging peer #[debug(skip)] - pub direct_message: DMBehaviour, + pub direct_message: libp2p::request_response::cbor::Behaviour, Vec>, /// Behaviour for requesting and receiving data #[debug(skip)] @@ -61,7 +64,7 @@ impl NetworkDef { gossipsub: GossipBehaviour, dht: DHTBehaviour, identify: IdentifyBehaviour, - direct_message: DMBehaviour, + direct_message: cbor::Behaviour, Vec>, request_response: cbor::Behaviour, ) -> NetworkDef { Self { @@ -147,22 +150,16 @@ impl NetworkDef { pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec, retry_count: u8) { let request = DMRequest { peer_id, - data, + data: data.clone(), backoff: ExponentialBackoff::default(), retry_count, }; - self.direct_message.add_direct_request(request); + let id = self.direct_message.send_request(&peer_id, data); } /// Add a direct response for a channel pub fn add_direct_response(&mut self, chan: ResponseChannel>, msg: Vec) { - self.direct_message.add_direct_response(chan, msg); - } -} - -impl From for NetworkEventInternal { - fn from(event: DMEvent) -> Self { - Self::DMEvent(event) + self.direct_message.send_response(chan, msg); } } @@ -183,6 +180,11 @@ impl From for NetworkEventInternal { Self::IdentifyEvent(Box::new(event)) } } +impl From, Vec>> for NetworkEventInternal { + fn from(value: libp2p::request_response::Event, Vec>) -> Self { + Self::DMEvent(value) + } +} impl From> for NetworkEventInternal { fn from(event: libp2p::request_response::Event) -> Self { diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 4a28ad441a..0242deb9ce 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -188,7 +188,7 @@ pub enum NetworkEventInternal { /// a gossip event GossipEvent(Box), /// a direct message event - DMEvent(DMEvent), + DMEvent(libp2p::request_response::Event, Vec>), /// a request response event RequestResponseEvent(libp2p::request_response::Event), } diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index c91dc7c397..32dd25f76a 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -87,6 +87,8 @@ pub struct NetworkNode { listener_id: Option, /// Handler for requests and response behavior events. request_response_state: RequestResponseState, + /// Handler for direct messages + direct_message_state: DMBehaviour, } impl NetworkNode { @@ -301,7 +303,7 @@ impl NetworkNode { .unwrap_or_else(|| NonZeroUsize::new(4).unwrap()), ), identify, - DMBehaviour::new(direct_message), + direct_message, request_response, ); @@ -334,6 +336,7 @@ impl NetworkNode { config, listener_id: None, request_response_state: RequestResponseState::default(), + direct_message_state: DMBehaviour::default(), }) } @@ -603,14 +606,9 @@ impl NetworkNode { None } }, - NetworkEventInternal::DMEvent(e) => Some(match e { - DMEvent::DirectRequest(data, pid, chan) => { - NetworkEvent::DirectRequest(data, pid, chan) - } - DMEvent::DirectResponse(data, pid) => { - NetworkEvent::DirectResponse(data, pid) - } - }), + NetworkEventInternal::DMEvent(e) => { + self.direct_message_state.handle_dm_event(e) + } NetworkEventInternal::RequestResponseEvent(e) => { self.request_response_state.handle_request_response(e) }