Skip to content

Commit

Permalink
chore: add future to process responses
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Dec 7, 2024
1 parent 2fcd013 commit ea87613
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions extensions/warp-ipfs/src/store/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use futures::{FutureExt, SinkExt, Stream, StreamExt};
use pollable_map::futures::FutureMap;
use rust_ipfs::libp2p::request_response::InboundRequestId;
use rust_ipfs::{
libp2p::swarm::dial_opts::DialOpts, ConnectionEvents, Ipfs, Keypair, Multiaddr,
PeerConnectionEvents, PeerId,
Expand Down Expand Up @@ -502,7 +504,9 @@ struct DiscoveryTask {
command_rx: futures::channel::mpsc::Receiver<DiscoveryCommand>,
connection_event: BoxStream<'static, ConnectionEvents>,

discovery_request_st: BoxStream<'static, (PeerId, Bytes, oneshot::Sender<Bytes>)>,
discovery_request_st: BoxStream<'static, (PeerId, InboundRequestId, Bytes)>,

responsing_fut: FutureMap<(PeerId, InboundRequestId), BoxFuture<'static, Result<(), anyhow::Error>>>,

discovery_fut: Option<BoxFuture<'static, Result<Vec<PeerId>, Error>>>,

Expand Down Expand Up @@ -537,6 +541,7 @@ impl DiscoveryTask {
config: config.clone(),
relays,
peers: StreamMap::new(),
responsing_fut: FutureMap::new(),
broadcast_tx,
command_rx,
connection_event,
Expand Down Expand Up @@ -780,15 +785,18 @@ impl Future for DiscoveryTask {
}
}

while let Poll::Ready(Some((peer_id, request, response))) =
while let Poll::Ready(Some((peer_id, id, request))) =
self.discovery_request_st.poll_next_unpin(cx)
{
let ipfs = self.ipfs.clone();
let Ok(payload) = PayloadMessage::from_bytes(&request) else {
let pl = PayloadBuilder::new(&self.keypair, DiscoveryResponse::InvalidRequest)
.build()
.expect("valid payload");
let bytes = pl.to_bytes().expect("valid payload");
_ = response.send(bytes);
self.responsing_fut.insert((peer_id, id), async move {
ipfs.send_response(peer_id, id, bytes).await
}.boxed());
continue;
};

Expand All @@ -798,7 +806,9 @@ impl Future for DiscoveryTask {
.build()
.expect("valid payload");
let bytes = pl.to_bytes().expect("valid payload");
_ = response.send(bytes);
self.responsing_fut.insert((peer_id, id), async move {
ipfs.send_response(peer_id, id, bytes).await
}.boxed());
continue;
}

Expand All @@ -811,8 +821,14 @@ impl Future for DiscoveryTask {
}
};

if response.send(bytes).is_err() {
tracing::warn!(%peer_id, "unable to respond to peer due to request being dropped.");
self.responsing_fut.insert((peer_id, id), async move {
ipfs.send_response(peer_id, id, bytes).await
}.boxed());
}

while let Poll::Ready(Some(((peer_id, id), result))) = self.responsing_fut.poll_next_unpin(cx) {
if let Err(e) = result {
tracing::error!(%peer_id, error = %e, %id, "unable to respond to peer request");
}
}

Expand Down

0 comments on commit ea87613

Please sign in to comment.