From 4a30089f2b4c6b5c8d05ffcfc10afbe5f4a8cf4e Mon Sep 17 00:00:00 2001 From: Hannes Karppila Date: Fri, 22 Dec 2023 14:30:38 +0200 Subject: [PATCH 1/4] Simplify p2p network message serialization --- crates/services/p2p/src/behavior.rs | 14 +- crates/services/p2p/src/codecs.rs | 29 +--- crates/services/p2p/src/codecs/postcard.rs | 140 +++--------------- crates/services/p2p/src/p2p_service.rs | 122 ++++++--------- .../p2p/src/request_response/messages.rs | 36 +---- crates/services/p2p/src/service.rs | 36 +++-- 6 files changed, 108 insertions(+), 269 deletions(-) diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 6749bf7c5e..9c5e2653ab 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -14,8 +14,8 @@ use crate::{ PeerReportEvent, }, request_response::messages::{ - NetworkResponse, RequestMessage, + ResponseMessage, }, }; use fuel_core_types::fuel_types::BlockHeight; @@ -46,7 +46,7 @@ pub enum FuelBehaviourEvent { Discovery(KademliaEvent), PeerReport(PeerReportEvent), Gossipsub(GossipsubEvent), - RequestResponse(RequestResponseEvent), + RequestResponse(RequestResponseEvent), } /// Handles all p2p protocols needed for Fuel. @@ -144,9 +144,9 @@ impl FuelBehaviour { pub fn send_response_msg( &mut self, - channel: ResponseChannel, - message: NetworkResponse, - ) -> Result<(), NetworkResponse> { + channel: ResponseChannel, + message: ResponseMessage, + ) -> Result<(), ResponseMessage> { self.request_response.send_response(channel, message) } @@ -208,8 +208,8 @@ impl From for FuelBehaviourEvent { } } -impl From> for FuelBehaviourEvent { - fn from(event: RequestResponseEvent) -> Self { +impl From> for FuelBehaviourEvent { + fn from(event: RequestResponseEvent) -> Self { FuelBehaviourEvent::RequestResponse(event) } } diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 542236671b..164a8c1755 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -7,8 +7,6 @@ use crate::{ GossipsubMessage, }, request_response::messages::{ - NetworkResponse, - OutboundResponse, RequestMessage, ResponseMessage, }, @@ -30,37 +28,14 @@ pub trait GossipsubCodec { ) -> Result; } -pub trait RequestResponseConverter { - /// Response that is ready to be converted into NetworkResponse - type OutboundResponse; - /// Response that is sent over the network - type NetworkResponse; - /// Final Response Message deserialized from IntermediateResponse - type ResponseMessage; - - fn convert_to_network_response( - &self, - res_msg: &Self::OutboundResponse, - ) -> Result; - - fn convert_to_response( - &self, - inter_msg: &Self::NetworkResponse, - ) -> Result; -} - /// Main Codec trait /// Needs to be implemented and provided to FuelBehaviour pub trait NetworkCodec: GossipsubCodec< RequestMessage = GossipsubBroadcastRequest, ResponseMessage = GossipsubMessage, - > + RequestResponseCodec - + RequestResponseConverter< - NetworkResponse = NetworkResponse, - OutboundResponse = OutboundResponse, - ResponseMessage = ResponseMessage, - > + Clone + > + RequestResponseCodec + + Clone + Send + 'static { diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 77f8ad9290..124e80333a 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,7 +1,6 @@ use super::{ GossipsubCodec, NetworkCodec, - RequestResponseConverter, }; use crate::{ gossipsub::messages::{ @@ -10,8 +9,6 @@ use crate::{ GossipsubMessage, }, request_response::messages::{ - NetworkResponse, - OutboundResponse, RequestMessage, ResponseMessage, MAX_REQUEST_SIZE, @@ -39,6 +36,18 @@ use serde::{ }; use std::io; +/// Helper method for decoding data +/// Reusable across `RequestResponseCodec` and `GossipsubCodec` +fn deserialize<'a, R: Deserialize<'a>>(encoded_data: &'a [u8]) -> Result { + postcard::from_bytes(encoded_data) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) +} + +fn serialize(data: &D) -> Result, io::Error> { + postcard::to_stdvec(&data) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) +} + #[derive(Debug, Clone)] pub struct PostcardCodec { /// Used for `max_size` parameter when reading Response Message @@ -53,21 +62,6 @@ impl PostcardCodec { max_response_size: max_block_size, } } - - /// Helper method for decoding data - /// Reusable across `RequestResponseCodec` and `GossipsubCodec` - fn deserialize<'a, R: Deserialize<'a>>( - &self, - encoded_data: &'a [u8], - ) -> Result { - postcard::from_bytes(encoded_data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) - } - - fn serialize(&self, data: &D) -> Result, io::Error> { - postcard::to_stdvec(&data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) - } } /// Since Postcard does not support async reads or writes out of the box @@ -81,7 +75,7 @@ impl PostcardCodec { impl RequestResponseCodec for PostcardCodec { type Protocol = MessageExchangePostcardProtocol; type Request = RequestMessage; - type Response = NetworkResponse; + type Response = ResponseMessage; async fn read_request( &mut self, @@ -92,8 +86,7 @@ impl RequestResponseCodec for PostcardCodec { T: AsyncRead + Unpin + Send, { let encoded_data = read_length_prefixed(socket, MAX_REQUEST_SIZE).await?; - - self.deserialize(&encoded_data) + deserialize(&encoded_data) } async fn read_response( @@ -105,8 +98,7 @@ impl RequestResponseCodec for PostcardCodec { T: futures::AsyncRead + Unpin + Send, { let encoded_data = read_length_prefixed(socket, self.max_response_size).await?; - - self.deserialize(&encoded_data) + deserialize(&encoded_data) } async fn write_request( @@ -118,15 +110,10 @@ impl RequestResponseCodec for PostcardCodec { where T: futures::AsyncWrite + Unpin + Send, { - match postcard::to_stdvec(&req) { - Ok(encoded_data) => { - write_length_prefixed(socket, encoded_data).await?; - socket.close().await?; - - Ok(()) - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), - } + let encoded_data = serialize(&req)?; + write_length_prefixed(socket, encoded_data).await?; + socket.close().await?; + Ok(()) } async fn write_response( @@ -138,15 +125,10 @@ impl RequestResponseCodec for PostcardCodec { where T: futures::AsyncWrite + Unpin + Send, { - match postcard::to_stdvec(&res) { - Ok(encoded_data) => { - write_length_prefixed(socket, encoded_data).await?; - socket.close().await?; - - Ok(()) - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), - } + let encoded_data = serialize(&res)?; + write_length_prefixed(socket, encoded_data).await?; + socket.close().await?; + Ok(()) } } @@ -168,87 +150,13 @@ impl GossipsubCodec for PostcardCodec { gossipsub_tag: GossipTopicTag, ) -> Result { let decoded_response = match gossipsub_tag { - GossipTopicTag::NewTx => { - GossipsubMessage::NewTx(self.deserialize(encoded_data)?) - } + GossipTopicTag::NewTx => GossipsubMessage::NewTx(deserialize(encoded_data)?), }; Ok(decoded_response) } } -impl RequestResponseConverter for PostcardCodec { - type NetworkResponse = NetworkResponse; - type OutboundResponse = OutboundResponse; - type ResponseMessage = ResponseMessage; - - fn convert_to_response( - &self, - inter_msg: &Self::NetworkResponse, - ) -> Result { - match inter_msg { - NetworkResponse::Block(block_bytes) => { - let response = if let Some(block_bytes) = block_bytes { - Some(self.deserialize(block_bytes)?) - } else { - None - }; - - Ok(ResponseMessage::SealedBlock(Box::new(response))) - } - NetworkResponse::Transactions(tx_bytes) => { - let response = if let Some(tx_bytes) = tx_bytes { - Some(self.deserialize(tx_bytes)?) - } else { - None - }; - - Ok(ResponseMessage::Transactions(response)) - } - NetworkResponse::Headers(headers_bytes) => { - let response = headers_bytes - .as_ref() - .map(|bytes| self.deserialize(bytes)) - .transpose()?; - Ok(ResponseMessage::SealedHeaders(response)) - } - } - } - - fn convert_to_network_response( - &self, - res_msg: &Self::OutboundResponse, - ) -> Result { - match res_msg { - OutboundResponse::Block(sealed_block) => { - let response = if let Some(sealed_block) = sealed_block { - Some(self.serialize(sealed_block.as_ref())?) - } else { - None - }; - - Ok(NetworkResponse::Block(response)) - } - OutboundResponse::Transactions(transactions) => { - let response = if let Some(transactions) = transactions { - Some(self.serialize(transactions.as_ref())?) - } else { - None - }; - - Ok(NetworkResponse::Transactions(response)) - } - OutboundResponse::SealedHeaders(maybe_headers) => { - let response = maybe_headers - .as_ref() - .map(|headers| self.serialize(&headers)) - .transpose()?; - Ok(NetworkResponse::Headers(response)) - } - } - } -} - impl NetworkCodec for PostcardCodec { fn get_req_res_protocol(&self) -> ::Protocol { MessageExchangePostcardProtocol {} diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index e1f277598e..ea33e528da 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -21,13 +21,11 @@ use crate::{ }, peer_report::PeerReportEvent, request_response::messages::{ - NetworkResponse, - OutboundResponse, RequestError, RequestMessage, ResponseChannelItem, - ResponseError, ResponseMessage, + ResponseSendError, }, }; use fuel_core_metrics::p2p_metrics::p2p_metrics; @@ -64,6 +62,7 @@ use libp2p::{ use rand::seq::IteratorRandom; use std::{ collections::HashMap, + sync::Arc, time::Duration, }; use tracing::{ @@ -100,7 +99,7 @@ pub struct FuelP2PService { /// Holds the ResponseChannel(s) for the inbound requests from the p2p Network /// Once the Response is prepared by the NetworkOrchestrator /// It will send it to the specified Peer via its unique ResponseChannel - inbound_requests_table: HashMap>, + inbound_requests_table: HashMap>, /// NetworkCodec used as for encoding and decoding of Gossipsub messages network_codec: Codec, @@ -303,6 +302,7 @@ impl FuelP2PService { /// Sends RequestMessage to a peer /// If the peer is not defined it will pick one at random + /// Only returns error if no peers are connected pub fn send_request_msg( &mut self, peer_id: Option, @@ -339,31 +339,21 @@ impl FuelP2PService { pub fn send_response_msg( &mut self, request_id: RequestId, - message: OutboundResponse, - ) -> Result<(), ResponseError> { - match ( - self.network_codec.convert_to_network_response(&message), - self.inbound_requests_table.remove(&request_id), - ) { - (Ok(message), Some(channel)) => { - if self - .swarm - .behaviour_mut() - .send_response_msg(channel, message) - .is_err() - { - debug!("Failed to send ResponseMessage for {:?}", request_id); - return Err(ResponseError::SendingResponseFailed) - } - } - (Ok(_), None) => { - debug!("ResponseChannel for {:?} does not exist!", request_id); - return Err(ResponseError::ResponseChannelDoesNotExist) - } - (Err(e), _) => { - debug!("Failed to convert to IntermediateResponse with {:?}", e); - return Err(ResponseError::ConversionToIntermediateFailed) - } + message: ResponseMessage, + ) -> Result<(), ResponseSendError> { + let Some(channel) = self.inbound_requests_table.remove(&request_id) else { + debug!("ResponseChannel for {:?} does not exist!", request_id); + return Err(ResponseSendError::ResponseChannelDoesNotExist) + }; + + if self + .swarm + .behaviour_mut() + .send_response_msg(channel, message) + .is_err() + { + debug!("Failed to send ResponseMessage for {:?}", request_id); + return Err(ResponseSendError::SendingResponseFailed) } Ok(()) @@ -593,51 +583,37 @@ impl FuelP2PService { request_id, response, } => { - match ( - self.outbound_requests_table.remove(&request_id), - self.network_codec.convert_to_response(&response), - ) { + let Some(channel) = + self.outbound_requests_table.remove(&request_id) + else { + debug!("Send channel not found for {:?}", request_id); + return None; + }; + + let send_ok = match (channel, response) { ( - Some(ResponseChannelItem::Block(channel)), - Ok(ResponseMessage::SealedBlock(block)), - ) => { - if channel.send(*block).is_err() { - debug!( - "Failed to send through the channel for {:?}", - request_id - ); - } - } + ResponseChannelItem::Block(channel), + ResponseMessage::Block(block), + ) => channel.send(block.map(|b| Arc::into_inner(b).expect("There are not other references, we just received this from the network"))).is_ok(), ( - Some(ResponseChannelItem::Transactions(channel)), - Ok(ResponseMessage::Transactions(transactions)), - ) => { - if channel.send(transactions).is_err() { - debug!( - "Failed to send through the channel for {:?}", - request_id - ); - } - } + ResponseChannelItem::Transactions(channel), + ResponseMessage::Transactions(transactions), + ) => channel.send(transactions.map(|b| Arc::into_inner(b).expect("There are not other references, we just received this from the network"))).is_ok(), ( - Some(ResponseChannelItem::SealedHeaders(channel)), - Ok(ResponseMessage::SealedHeaders(headers)), - ) => { - if channel.send((peer, headers)).is_err() { - debug!( - "Failed to send through the channel for {:?}", - request_id - ); - } + ResponseChannelItem::SealedHeaders(channel), + ResponseMessage::SealedHeaders(headers), + ) => channel.send((peer, headers)).is_ok(), + _ => { + debug!("Mismatching reponse channel and message types"); + return None; } + }; - (Some(_), Err(e)) => { - debug!("Failed to convert IntermediateResponse into a ResponseMessage {:?} with {:?}", response, e); - } - (None, Ok(_)) => { - debug!("Send channel not found for {:?}", request_id); - } - _ => {} + if !send_ok { + debug!( + "Failed to send through the channel for {:?}", + request_id + ); } } }, @@ -687,9 +663,9 @@ mod tests { p2p_service::FuelP2PEvent, peer_manager::PeerInfo, request_response::messages::{ - OutboundResponse, RequestMessage, ResponseChannelItem, + ResponseMessage, }, service::to_message_acceptance, }; @@ -1574,17 +1550,17 @@ mod tests { consensus: Consensus::PoA(PoAConsensus::new(Default::default())), }; - let _ = node_b.send_response_msg(*request_id, OutboundResponse::Block(Some(Arc::new(sealed_block)))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::Block(Some(Arc::new(sealed_block)))); } RequestMessage::SealedHeaders(range) => { let sealed_headers: Vec<_> = arbitrary_headers_for_range(range.clone()); - let _ = node_b.send_response_msg(*request_id, OutboundResponse::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); } RequestMessage::Transactions(_) => { let txs = (0..5).map(|_| Transaction::default_test_tx()).collect(); let transactions = vec![Transactions(txs)]; - let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions)))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(Arc::new(transactions)))); } } } diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 39ea25405e..0778342cd5 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -24,16 +24,6 @@ pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1"; /// Max Size in Bytes of the Request Message pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::(); -// Peer receives a `RequestMessage`. -// It prepares a response in form of `OutboundResponse` -// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format. -// The Peer that requested the message receives the response over the wire in `NetworkResponse` format. -// It then unpacks it into `ResponseMessage`. -// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receiving channel. -// Client Peer: `RequestMessage` (send request) -// Server Peer: `RequestMessage` (receive request) -> `OutboundResponse` -> `NetworkResponse` (send response) -// Client Peer: `NetworkResponse` (receive response) -> `ResponseMessage(data)` -> `ResponseChannelItem(channel, data)` (handle response) - #[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)] pub enum RequestMessage { Block(BlockHeight), @@ -41,14 +31,6 @@ pub enum RequestMessage { Transactions(Range), } -/// Final Response Message that p2p service sends to the Orchestrator -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum ResponseMessage { - SealedBlock(Box>), - SealedHeaders(Option>), - Transactions(Option>), -} - /// Holds oneshot channels for specific responses #[derive(Debug)] pub enum ResponseChannelItem { @@ -57,19 +39,8 @@ pub enum ResponseChannelItem { Transactions(oneshot::Sender>>), } -/// Response that is sent over the wire -/// and then additionally deserialized into `ResponseMessage` -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum NetworkResponse { - Block(Option>), - Headers(Option>), - Transactions(Option>), -} - -/// Initial state of the `ResponseMessage` prior to having its inner value serialized -/// and wrapped into `NetworkResponse` -#[derive(Debug, Clone)] -pub enum OutboundResponse { +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ResponseMessage { Block(Option>), SealedHeaders(Option>), Transactions(Option>>), @@ -81,8 +52,9 @@ pub enum RequestError { NoPeersConnected, } +/// Errors than can occur when attempting to send a response #[derive(Debug, Eq, PartialEq, Error)] -pub enum ResponseError { +pub enum ResponseSendError { #[error("Response channel does not exist")] ResponseChannelDoesNotExist, #[error("Failed to send response")] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index bff726fff8..455d8bd829 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -18,9 +18,9 @@ use crate::{ P2pDb, }, request_response::messages::{ - OutboundResponse, RequestMessage, ResponseChannelItem, + ResponseMessage, }, }; use anyhow::anyhow; @@ -166,6 +166,7 @@ pub trait TaskP2PService: Send { &mut self, message: GossipsubBroadcastRequest, ) -> anyhow::Result<()>; + fn send_request_msg( &mut self, peer_id: Option, @@ -176,7 +177,7 @@ pub trait TaskP2PService: Send { fn send_response_msg( &mut self, request_id: RequestId, - message: OutboundResponse, + message: ResponseMessage, ) -> anyhow::Result<()>; fn report_message( &mut self, @@ -232,7 +233,7 @@ impl TaskP2PService for FuelP2PService { fn send_response_msg( &mut self, request_id: RequestId, - message: OutboundResponse, + message: ResponseMessage, ) -> anyhow::Result<()> { self.send_response_msg(request_id, message)?; Ok(()) @@ -497,7 +498,10 @@ where let request_msg = RequestMessage::Block(height); let channel_item = ResponseChannelItem::Block(channel); let peer = self.p2p_service.get_peer_id_with_height(&height); - let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); + let found_peers = self.p2p_service.send_request_msg(peer, request_msg, channel_item).is_ok(); + if !found_peers { + tracing::debug!("No peers found for block at height {:?}", height); + } } Some(TaskRequest::GetSealedHeaders { block_height_range, channel: response}) => { let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); @@ -508,12 +512,16 @@ where let block_height = BlockHeight::from(block_height_range.end.saturating_sub(1)); let peer = self.p2p_service .get_peer_id_with_height(&block_height); - let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); + let found_peers = self.p2p_service.send_request_msg(peer, request_msg, channel_item).is_ok(); + if !found_peers { + tracing::debug!("No peers found for block at height {:?}", block_height); + } } Some(TaskRequest::GetTransactions { block_height_range, from_peer, channel }) => { let request_msg = RequestMessage::Transactions(block_height_range); let channel_item = ResponseChannelItem::Transactions(channel); - let _ = self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item); + self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item) + .expect("We always a peer here, so send has a target"); } Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { // report_message(&mut self.p2p_service, message, acceptance); @@ -562,12 +570,12 @@ where match self.db.get_sealed_block(&block_height) { Ok(maybe_block) => { let response = maybe_block.map(Arc::new); - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Block(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Block(response)); }, Err(e) => { tracing::error!("Failed to get block at height {:?}: {:?}", block_height, e); let response = None; - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Block(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Block(response)); return Err(e.into()) } } @@ -576,12 +584,12 @@ where match self.db.get_transactions(range.clone()) { Ok(maybe_transactions) => { let response = maybe_transactions.map(Arc::new); - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Transactions(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Transactions(response)); }, Err(e) => { tracing::error!("Failed to get transactions for range {:?}: {:?}", range, e); let response = None; - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Transactions(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Transactions(response)); return Err(e.into()) } } @@ -592,17 +600,17 @@ where tracing::error!("Requested range of sealed headers is too big. Requested length: {:?}, Max length: {:?}", range.len(), max_len); // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 let response = None; - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::SealedHeaders(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); } else { match self.db.get_sealed_headers(range.clone()) { Ok(headers) => { let response = Some(headers); - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::SealedHeaders(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); }, Err(e) => { tracing::error!("Failed to get sealed headers for range {:?}: {:?}", range, &e); let response = None; - let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::SealedHeaders(response)); + let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::SealedHeaders(response)); return Err(e.into()) } } @@ -971,7 +979,7 @@ pub mod tests { fn send_response_msg( &mut self, _request_id: RequestId, - _message: OutboundResponse, + _message: ResponseMessage, ) -> anyhow::Result<()> { todo!() } From d93ccd3197c17d01b7d8f2fef8aaaa51d51ae547 Mon Sep 17 00:00:00 2001 From: Hannes Karppila Date: Fri, 22 Dec 2023 14:46:41 +0200 Subject: [PATCH 2/4] Add changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2cf097802..2934603b73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). Description of the upcoming release here. +#### Breaking +- [#1573](https://github.com/FuelLabs/fuel-core/pull/1573): Remove nested p2p request/response encoding. Only breaks p2p networking compatibility with older fuel-core versions, but is otherwise fully internal. + ## [Version 0.22.0] ### Added From f233e248aa4e08c7a574a70949a12329d27be6b8 Mon Sep 17 00:00:00 2001 From: Hannes Karppila Date: Fri, 22 Dec 2023 17:59:02 +0200 Subject: [PATCH 3/4] Add a missing linebreak --- crates/services/p2p/src/service.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index ece79af0ee..d923b0d84b 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -179,6 +179,7 @@ pub trait TaskP2PService: Send { request_id: InboundRequestId, message: ResponseMessage, ) -> anyhow::Result<()>; + fn report_message( &mut self, message: GossipsubMessageInfo, From 8ca11b166bd6c48679eda0b73d6b360c29e6bc08 Mon Sep 17 00:00:00 2001 From: xgreenx Date: Sat, 6 Jan 2024 03:22:00 +0100 Subject: [PATCH 4/4] Remove `Arc` --- crates/services/p2p/src/p2p_service.rs | 23 ++++++++----------- .../p2p/src/request_response/messages.rs | 12 ++++------ crates/services/p2p/src/service.rs | 6 ++--- 3 files changed, 15 insertions(+), 26 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 65bcc3f4c2..de80500632 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -3,6 +3,10 @@ use crate::{ FuelBehaviour, FuelBehaviourEvent, }, + codecs::{ + postcard::PostcardCodec, + GossipsubCodec, + }, config::{ build_transport_function, Config, @@ -14,6 +18,7 @@ use crate::{ }, topics::GossipsubTopics, }, + heartbeat::HeartbeatEvent, peer_manager::{ PeerManager, Punisher, @@ -57,19 +62,9 @@ use libp2p::{ SwarmBuilder, }; use libp2p_gossipsub::PublishError; - -use crate::{ - codecs::{ - postcard::PostcardCodec, - GossipsubCodec, - RequestResponseConverter, - }, - heartbeat::HeartbeatEvent, -}; use rand::seq::IteratorRandom; use std::{ collections::HashMap, - sync::Arc, time::Duration, }; use tracing::{ @@ -573,11 +568,11 @@ impl FuelP2PService { ( ResponseChannelItem::Block(channel), ResponseMessage::Block(block), - ) => channel.send(block.map(|b| Arc::into_inner(b).expect("There are not other references, we just received this from the network"))).is_ok(), + ) => channel.send(block).is_ok(), ( ResponseChannelItem::Transactions(channel), ResponseMessage::Transactions(transactions), - ) => channel.send(transactions.map(|b| Arc::into_inner(b).expect("There are not other references, we just received this from the network"))).is_ok(), + ) => channel.send(transactions).is_ok(), ( ResponseChannelItem::SealedHeaders(channel), ResponseMessage::SealedHeaders(headers), @@ -1579,7 +1574,7 @@ mod tests { consensus: Consensus::PoA(PoAConsensus::new(Default::default())), }; - let _ = node_b.send_response_msg(*request_id, ResponseMessage::Block(Some(Arc::new(sealed_block)))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::Block(Some(sealed_block))); } RequestMessage::SealedHeaders(range) => { let sealed_headers: Vec<_> = arbitrary_headers_for_range(range.clone()); @@ -1589,7 +1584,7 @@ mod tests { RequestMessage::Transactions(_) => { let txs = (0..5).map(|_| Transaction::default_test_tx()).collect(); let transactions = vec![Transactions(txs)]; - let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(Arc::new(transactions)))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(transactions))); } } } diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 8c7be510ce..2d82ac42dd 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -1,8 +1,3 @@ -use std::{ - ops::Range, - sync::Arc, -}; - use fuel_core_types::{ blockchain::{ SealedBlock, @@ -16,6 +11,7 @@ use serde::{ Deserialize, Serialize, }; +use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; @@ -40,11 +36,11 @@ pub enum ResponseChannelItem { Transactions(oneshot::Sender>>), } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum ResponseMessage { - Block(Option>), + Block(Option), SealedHeaders(Option>), - Transactions(Option>>), + Transactions(Option>), } #[derive(Debug, Error)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 147d6248b6..7b2eac2ab4 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -566,8 +566,7 @@ where match request_message { RequestMessage::Block(block_height) => { match self.db.get_sealed_block(&block_height) { - Ok(maybe_block) => { - let response = maybe_block.map(Arc::new); + Ok(response) => { let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Block(response)); }, Err(e) => { @@ -580,8 +579,7 @@ where } RequestMessage::Transactions(range) => { match self.db.get_transactions(range.clone()) { - Ok(maybe_transactions) => { - let response = maybe_transactions.map(Arc::new); + Ok(response) => { let _ = self.p2p_service.send_response_msg(request_id, ResponseMessage::Transactions(response)); }, Err(e) => {