Skip to content

Commit

Permalink
Removing behaviour impl for DM
Browse files Browse the repository at this point in the history
  • Loading branch information
bfish713 committed Mar 22, 2024
1 parent 98ac462 commit b8cbf7d
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 214 deletions.
208 changes: 16 additions & 192 deletions crates/libp2p-networking/src/network/behaviours/direct_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ use std::{
use libp2p::request_response::cbor::Behaviour;
use libp2p::{
request_response::{Event, Message, OutboundRequestId, ResponseChannel},
swarm::{NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm},
Multiaddr,
};
use libp2p_identity::PeerId;
use tracing::{error, info};

use crate::network::NetworkEvent;

use super::exponential_backoff::ExponentialBackoff;

/// Request to direct message a peert
#[derive(Debug)]
pub struct DMRequest {
/// the recv-ers peer id
pub peer_id: PeerId,
Expand All @@ -28,15 +30,10 @@ pub struct DMRequest {

/// Wrapper metadata around libp2p's request response
/// usage: direct message peer
#[derive(Debug, Default)]
pub struct DMBehaviour {
/// The wrapped behaviour
request_response: Behaviour<Vec<u8>, Vec<u8>>,
/// In progress queries
in_progress_rr: HashMap<OutboundRequestId, DMRequest>,
/// Failed queries to be retried
failed_rr: VecDeque<DMRequest>,
/// lsit of out events for parent behaviour
out_event_queue: Vec<DMEvent>,
}

/// Lilst of direct message output events
Expand All @@ -50,7 +47,10 @@ pub enum DMEvent {

impl DMBehaviour {
/// handle a direct message event
fn handle_dm_event(&mut self, event: Event<Vec<u8>, Vec<u8>>) {
pub(crate) fn handle_dm_event(
&mut self,
event: Event<Vec<u8>, Vec<u8>>,
) -> Option<NetworkEvent> {
match event {
Event::InboundFailure {
peer,
Expand All @@ -61,6 +61,7 @@ impl DMBehaviour {
"inbound failure to send message to {:?} with error {:?}",
peer, error
);
None
}
Event::OutboundFailure {
peer,
Expand All @@ -71,10 +72,8 @@ impl DMBehaviour {
"outbound failure to send message to {:?} with error {:?}",
peer, error
);
if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
req.backoff.start_next(false);
self.failed_rr.push_back(req);
}
if let Some(mut req) = self.in_progress_rr.remove(&request_id) {}
None
}
Event::Message { message, peer, .. } => match message {
Message::Request {
Expand All @@ -85,8 +84,7 @@ impl DMBehaviour {
info!("recv-ed DIRECT REQUEST {:?}", msg);
// receiver, not initiator.
// don't track. If we are disconnected, sender will reinitiate
self.out_event_queue
.push(DMEvent::DirectRequest(msg, peer, channel));
Some(NetworkEvent::DirectRequest(msg, peer, channel))
}
Message::Response {
request_id,
Expand All @@ -95,206 +93,32 @@ impl DMBehaviour {
// success, finished.
if let Some(req) = self.in_progress_rr.remove(&request_id) {
info!("recv-ed DIRECT RESPONSE {:?}", msg);
self.out_event_queue
.push(DMEvent::DirectResponse(msg, req.peer_id));
Some(NetworkEvent::DirectResponse(msg, req.peer_id))
} else {
error!("recv-ed a direct response, but is no longer tracking message!");
None
}
}
},
e @ Event::ResponseSent { .. } => {
info!(?e, " sending response");
None
}
}
}
}

impl NetworkBehaviour for DMBehaviour {
type ConnectionHandler = <Behaviour<Vec<u8>, Vec<u8>> as NetworkBehaviour>::ConnectionHandler;

type ToSwarm = DMEvent;

fn on_swarm_event(&mut self, event: libp2p::swarm::derive_prelude::FromSwarm<'_>) {
self.request_response.on_swarm_event(event);
}

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: libp2p::swarm::derive_prelude::ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.request_response
.on_connection_handler_event(peer_id, connection_id, event);
}

fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<ToSwarm<DMEvent, THandlerInEvent<Self>>> {
let mut retry_req_indices = Vec::new();
for (idx, req) in self.failed_rr.iter().enumerate() {
if req.backoff.is_expired() {
retry_req_indices.push(idx);
}
}
let _ = retry_req_indices.into_iter().map(|idx| {
let req = self.failed_rr.remove(idx).unwrap();
self.add_direct_request(req);
});
while let Poll::Ready(ready) = NetworkBehaviour::poll(&mut self.request_response, cx) {
match ready {
// NOTE: this generates request
ToSwarm::GenerateEvent(e) => {
self.handle_dm_event(e);
}
ToSwarm::Dial { opts } => {
return Poll::Ready(ToSwarm::Dial { opts });
}
ToSwarm::NotifyHandler {
peer_id,
handler,
event,
} => {
return Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler,
event,
});
}
ToSwarm::CloseConnection {
peer_id,
connection,
} => {
return Poll::Ready(ToSwarm::CloseConnection {
peer_id,
connection,
});
}
ToSwarm::ListenOn { opts } => {
return Poll::Ready(ToSwarm::ListenOn { opts });
}
ToSwarm::RemoveListener { id } => {
return Poll::Ready(ToSwarm::RemoveListener { id });
}
ToSwarm::NewExternalAddrCandidate(c) => {
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(c));
}
ToSwarm::ExternalAddrConfirmed(c) => {
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(c));
}
ToSwarm::ExternalAddrExpired(c) => {
return Poll::Ready(ToSwarm::ExternalAddrExpired(c));
}
e => {
error!("UNHANDLED NEW SWARM VARIANT: {e:?}");
}
}
}
if !self.out_event_queue.is_empty() {
return Poll::Ready(ToSwarm::GenerateEvent(self.out_event_queue.remove(0)));
}
Poll::Pending
}

fn handle_pending_inbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), libp2p::swarm::ConnectionDenied> {
self.request_response.handle_pending_inbound_connection(
connection_id,
local_addr,
remote_addr,
)
}

fn handle_established_inbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
self.request_response.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}

fn handle_pending_outbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
maybe_peer: Option<PeerId>,
addresses: &[Multiaddr],
effective_role: libp2p::core::Endpoint,
) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
self.request_response.handle_pending_outbound_connection(
connection_id,
maybe_peer,
addresses,
effective_role,
)
}

fn handle_established_outbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: libp2p::core::Endpoint,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
self.request_response
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}
}

impl DMBehaviour {
/// Create new behaviour based on request response
#[must_use]
pub fn new(request_response: Behaviour<Vec<u8>, Vec<u8>>) -> Self {
Self {
request_response,
in_progress_rr: HashMap::default(),
failed_rr: VecDeque::default(),
out_event_queue: Vec::default(),
}
}

/// Add address to request response behaviour
pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
self.request_response.add_address(peer_id, address);
}

/// Remove address from request response behaviour
pub fn remove_address(&mut self, peer_id: &PeerId, address: &Multiaddr) {
self.request_response.remove_address(peer_id, address);
}

/// Add a direct request for a given peer
pub fn add_direct_request(&mut self, mut req: DMRequest) {
pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) {
if req.retry_count == 0 {
return;
}

req.retry_count -= 1;

let request_id = self
.request_response
.send_request(&req.peer_id, req.data.clone());
info!("direct message request with id {:?}", request_id);

self.in_progress_rr.insert(request_id, req);
}

/// Add a direct response for a channel
pub fn add_direct_response(&mut self, chan: ResponseChannel<Vec<u8>>, msg: Vec<u8>) {
let res = self.request_response.send_response(chan, msg);
if let Err(e) = res {
error!("Error replying to direct message. {:?}", e);
}
}
}
26 changes: 14 additions & 12 deletions crates/libp2p-networking/src/network/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use futures::channel::oneshot::Sender;
use libp2p::{
gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic},
identify::{Behaviour as IdentifyBehaviour, Event as IdentifyEvent},
request_response::{cbor, ResponseChannel},
request_response::{
cbor::{self, Behaviour},
ResponseChannel,
},
Multiaddr,
};
use libp2p_identity::PeerId;
Expand Down Expand Up @@ -47,7 +50,7 @@ pub struct NetworkDef {

/// purpose: directly messaging peer
#[debug(skip)]
pub direct_message: DMBehaviour,
pub direct_message: libp2p::request_response::cbor::Behaviour<Vec<u8>, Vec<u8>>,

/// Behaviour for requesting and receiving data
#[debug(skip)]
Expand All @@ -61,7 +64,7 @@ impl NetworkDef {
gossipsub: GossipBehaviour,
dht: DHTBehaviour,
identify: IdentifyBehaviour,
direct_message: DMBehaviour,
direct_message: cbor::Behaviour<Vec<u8>, Vec<u8>>,
request_response: cbor::Behaviour<Request, Response>,
) -> NetworkDef {
Self {
Expand Down Expand Up @@ -147,22 +150,16 @@ impl NetworkDef {
pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec<u8>, retry_count: u8) {
let request = DMRequest {
peer_id,
data,
data: data.clone(),
backoff: ExponentialBackoff::default(),
retry_count,
};
self.direct_message.add_direct_request(request);
let id = self.direct_message.send_request(&peer_id, data);
}

/// Add a direct response for a channel
pub fn add_direct_response(&mut self, chan: ResponseChannel<Vec<u8>>, msg: Vec<u8>) {
self.direct_message.add_direct_response(chan, msg);
}
}

impl From<DMEvent> for NetworkEventInternal {
fn from(event: DMEvent) -> Self {
Self::DMEvent(event)
self.direct_message.send_response(chan, msg);
}
}

Expand All @@ -183,6 +180,11 @@ impl From<IdentifyEvent> for NetworkEventInternal {
Self::IdentifyEvent(Box::new(event))
}
}
impl From<libp2p::request_response::Event<Vec<u8>, Vec<u8>>> for NetworkEventInternal {
fn from(value: libp2p::request_response::Event<Vec<u8>, Vec<u8>>) -> Self {
Self::DMEvent(value)
}
}

impl From<libp2p::request_response::Event<Request, Response>> for NetworkEventInternal {
fn from(event: libp2p::request_response::Event<Request, Response>) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/libp2p-networking/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub enum NetworkEventInternal {
/// a gossip event
GossipEvent(Box<GossipEvent>),
/// a direct message event
DMEvent(DMEvent),
DMEvent(libp2p::request_response::Event<Vec<u8>, Vec<u8>>),
/// a request response event
RequestResponseEvent(libp2p::request_response::Event<Request, Response>),
}
Expand Down
Loading

0 comments on commit b8cbf7d

Please sign in to comment.