diff --git a/Cargo.lock b/Cargo.lock index 11050bd332..8521d0ca1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4548,6 +4548,7 @@ dependencies = [ "async-trait", "bincode", "blake3", + "cbor4ii", "custom_debug", "delegate", "derive_builder", diff --git a/crates/examples/infra/mod.rs b/crates/examples/infra/mod.rs index 929232e45d..b79769df6e 100755 --- a/crates/examples/infra/mod.rs +++ b/crates/examples/infra/mod.rs @@ -62,7 +62,7 @@ use hotshot_types::{ }, HotShotConfig, PeerConfig, ValidatorConfig, }; -use libp2p_networking::network::GossipConfig; +use libp2p_networking::network::{GossipConfig, RequestResponseConfig}; use rand::{rngs::StdRng, SeedableRng}; use surf_disco::Url; use tracing::{debug, error, info, warn}; @@ -754,6 +754,7 @@ where let libp2p_network = Libp2pNetwork::from_config( config.clone(), GossipConfig::default(), + RequestResponseConfig::default(), bind_address, &public_key, &private_key, diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index 8667943189..1ff090fc06 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -20,7 +20,7 @@ pub mod implementations { combined_network::{CombinedNetworks, UnderlyingCombinedNetworks}, libp2p_network::{ derive_libp2p_keypair, derive_libp2p_multiaddr, derive_libp2p_peer_id, GossipConfig, - Libp2pMetricsValue, Libp2pNetwork, PeerInfoVec, + Libp2pMetricsValue, Libp2pNetwork, PeerInfoVec, RequestResponseConfig, }, memory_network::{MasterMap, MemoryNetwork}, push_cdn_network::{ diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index de0a76156a..6567450243 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -59,7 +59,7 @@ use libp2p_identity::{ ed25519::{self, SecretKey}, Keypair, PeerId, }; -pub use libp2p_networking::network::GossipConfig; +pub use libp2p_networking::network::{GossipConfig, RequestResponseConfig}; use libp2p_networking::{ network::{ behaviours::dht::record::{Namespace, RecordKey, RecordValue}, @@ -395,6 +395,7 @@ impl Libp2pNetwork { pub async fn from_config( mut config: NetworkConfig, gossip_config: GossipConfig, + request_response_config: RequestResponseConfig, bind_address: Multiaddr, pub_key: &K, priv_key: &K::PrivateKey, @@ -414,6 +415,7 @@ impl Libp2pNetwork { // Set the gossip configuration config_builder.gossip_config(gossip_config.clone()); + config_builder.request_response_config(request_response_config); // Extrapolate the stake table from the known nodes let stake_table: HashSet = config diff --git a/crates/libp2p-networking/Cargo.toml b/crates/libp2p-networking/Cargo.toml index ecf4aa20fb..727ee9146a 100644 --- a/crates/libp2p-networking/Cargo.toml +++ b/crates/libp2p-networking/Cargo.toml @@ -39,6 +39,7 @@ void = "1" lazy_static = { workspace = true } pin-project = "1" portpicker.workspace = true +cbor4ii = "0.3" [target.'cfg(all(async_executor_impl = "tokio"))'.dependencies] libp2p = { workspace = true, features = ["tokio"] } diff --git a/crates/libp2p-networking/src/network/cbor.rs b/crates/libp2p-networking/src/network/cbor.rs new file mode 100644 index 0000000000..a289b998b5 --- /dev/null +++ b/crates/libp2p-networking/src/network/cbor.rs @@ -0,0 +1,145 @@ +use async_trait::async_trait; +use cbor4ii::core::error::DecodeError; +use futures::prelude::*; +use libp2p::{ + request_response::{self, Codec}, + StreamProtocol, +}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{collections::TryReserveError, convert::Infallible, io, marker::PhantomData}; + +/// `Behaviour` type alias for the `Cbor` codec +pub type Behaviour = request_response::Behaviour>; + +/// Forked `cbor` codec with altered request/response sizes +pub struct Cbor { + /// Phantom data + phantom: PhantomData<(Req, Resp)>, + /// Maximum request size in bytes + request_size_maximum: u64, + /// Maximum response size in bytes + response_size_maximum: u64, +} + +impl Default for Cbor { + fn default() -> Self { + Cbor { + phantom: PhantomData, + request_size_maximum: 20 * 1024 * 1024, + response_size_maximum: 20 * 1024 * 1024, + } + } +} + +impl Cbor { + /// Create a new `Cbor` codec with the given request and response sizes + #[must_use] + pub fn new(request_size_maximum: u64, response_size_maximum: u64) -> Self { + Cbor { + phantom: PhantomData, + request_size_maximum, + response_size_maximum, + } + } +} + +impl Clone for Cbor { + fn clone(&self) -> Self { + Self::default() + } +} + +#[async_trait] +impl Codec for Cbor +where + Req: Send + Serialize + DeserializeOwned, + Resp: Send + Serialize + DeserializeOwned, +{ + type Protocol = StreamProtocol; + type Request = Req; + type Response = Resp; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut vec = Vec::new(); + + io.take(self.request_size_maximum) + .read_to_end(&mut vec) + .await?; + + cbor4ii::serde::from_slice(vec.as_slice()).map_err(decode_into_io_error) + } + + async fn read_response(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut vec = Vec::new(); + + io.take(self.response_size_maximum) + .read_to_end(&mut vec) + .await?; + + cbor4ii::serde::from_slice(vec.as_slice()).map_err(decode_into_io_error) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let data: Vec = + cbor4ii::serde::to_vec(Vec::new(), &req).map_err(encode_into_io_error)?; + + io.write_all(data.as_ref()).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + resp: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let data: Vec = + cbor4ii::serde::to_vec(Vec::new(), &resp).map_err(encode_into_io_error)?; + + io.write_all(data.as_ref()).await?; + + Ok(()) + } +} + +/// Convert a `cbor4ii::serde::DecodeError` into an `io::Error` +fn decode_into_io_error(err: cbor4ii::serde::DecodeError) -> io::Error { + match err { + cbor4ii::serde::DecodeError::Core(DecodeError::Read(e)) => { + io::Error::new(io::ErrorKind::Other, e) + } + cbor4ii::serde::DecodeError::Core(e @ DecodeError::Unsupported { .. }) => { + io::Error::new(io::ErrorKind::Unsupported, e) + } + cbor4ii::serde::DecodeError::Core(e @ DecodeError::Eof { .. }) => { + io::Error::new(io::ErrorKind::UnexpectedEof, e) + } + cbor4ii::serde::DecodeError::Core(e) => io::Error::new(io::ErrorKind::InvalidData, e), + cbor4ii::serde::DecodeError::Custom(e) => { + io::Error::new(io::ErrorKind::Other, e.to_string()) + } + } +} + +/// Convert a `cbor4ii::serde::EncodeError` into an `io::Error` +fn encode_into_io_error(err: cbor4ii::serde::EncodeError) -> io::Error { + io::Error::new(io::ErrorKind::Other, err) +} diff --git a/crates/libp2p-networking/src/network/def.rs b/crates/libp2p-networking/src/network/def.rs index 39d6bdd1b2..ca0045dcb0 100644 --- a/crates/libp2p-networking/src/network/def.rs +++ b/crates/libp2p-networking/src/network/def.rs @@ -10,14 +10,14 @@ use libp2p::{ gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic}, identify::{Behaviour as IdentifyBehaviour, Event as IdentifyEvent}, kad::store::MemoryStore, - request_response::{cbor, OutboundRequestId, ResponseChannel}, + request_response::{OutboundRequestId, ResponseChannel}, Multiaddr, }; use libp2p_identity::PeerId; use libp2p_swarm_derive::NetworkBehaviour; use tracing::{debug, error}; -use super::{behaviours::dht::store::ValidatedStore, NetworkEventInternal}; +use super::{behaviours::dht::store::ValidatedStore, cbor, NetworkEventInternal}; /// Overarching network behaviour performing: /// - network topology discovoery @@ -45,7 +45,7 @@ pub struct NetworkDef { /// purpose: directly messaging peer #[debug(skip)] - pub direct_message: libp2p::request_response::cbor::Behaviour, Vec>, + pub direct_message: cbor::Behaviour, Vec>, /// Auto NAT behaviour to determine if we are publically reachable and /// by which address @@ -60,7 +60,7 @@ impl NetworkDef { gossipsub: GossipBehaviour, dht: libp2p::kad::Behaviour>, identify: IdentifyBehaviour, - direct_message: cbor::Behaviour, Vec>, + direct_message: super::cbor::Behaviour, Vec>, autonat: autonat::Behaviour, ) -> NetworkDef { Self { diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 693ef1e292..dea939556a 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -13,6 +13,9 @@ mod node; /// Alternative Libp2p transport implementations pub mod transport; +/// Forked `cbor` codec with altered request/response sizes +pub mod cbor; + use std::{collections::HashSet, fmt::Debug}; use futures::channel::oneshot::Sender; @@ -44,7 +47,7 @@ pub use self::{ node::{ spawn_network_node, GossipConfig, NetworkNode, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError, NetworkNodeHandle, NetworkNodeReceiver, - DEFAULT_REPLICATION_FACTOR, + RequestResponseConfig, DEFAULT_REPLICATION_FACTOR, }, }; #[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))] diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index 08e896d785..c3ec3f6316 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -41,7 +41,7 @@ use libp2p::{ identity::Keypair, kad::{store::MemoryStore, Behaviour, Config, Mode, Record}, request_response::{ - Behaviour as RequestResponse, Config as RequestResponseConfig, ProtocolSupport, + Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport, }, swarm::SwarmEvent, Multiaddr, StreamProtocol, Swarm, SwarmBuilder, @@ -53,7 +53,7 @@ use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; pub use self::{ config::{ GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError, - DEFAULT_REPLICATION_FACTOR, + RequestResponseConfig, DEFAULT_REPLICATION_FACTOR, }, handle::{spawn_network_node, NetworkNodeHandle, NetworkNodeReceiver}, }; @@ -62,6 +62,7 @@ use super::{ bootstrap::{self, DHTBootstrapTask, InputEvent}, store::ValidatedStore, }, + cbor::Cbor, gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent, NetworkEventInternal, }; @@ -269,10 +270,17 @@ impl NetworkNode { ); kadem.set_mode(Some(Mode::Server)); - let rrconfig = RequestResponseConfig::default(); + let rrconfig = Libp2pRequestResponseConfig::default(); - let direct_message: libp2p::request_response::cbor::Behaviour, Vec> = - RequestResponse::new( + // Create a new `cbor` codec with the given request and response sizes + let cbor = Cbor::new( + config.request_response_config.request_size_maximum, + config.request_response_config.response_size_maximum, + ); + + let direct_message: super::cbor::Behaviour, Vec> = + RequestResponse::with_codec( + cbor, [( StreamProtocol::new("/HotShot/direct_message/1.0"), ProtocolSupport::Full, diff --git a/crates/libp2p-networking/src/network/node/config.rs b/crates/libp2p-networking/src/network/node/config.rs index c51fc281ea..db9d5c13ad 100644 --- a/crates/libp2p-networking/src/network/node/config.rs +++ b/crates/libp2p-networking/src/network/node/config.rs @@ -33,6 +33,10 @@ pub struct NetworkNodeConfig { /// Configuration for `GossipSub` pub gossip_config: GossipConfig, + #[builder(default)] + /// Configuration for `RequestResponse` + pub request_response_config: RequestResponseConfig, + /// list of addresses to connect to at initialization pub to_connect_addrs: HashSet<(PeerId, Multiaddr)>, /// republication interval in DHT, must be much less than `ttl` @@ -151,3 +155,21 @@ impl Default for GossipConfig { } } } + +/// Configuration for Libp2p's request-response +#[derive(Clone, Debug)] +pub struct RequestResponseConfig { + /// The maximum request size in bytes + pub request_size_maximum: u64, + /// The maximum response size in bytes + pub response_size_maximum: u64, +} + +impl Default for RequestResponseConfig { + fn default() -> Self { + Self { + request_size_maximum: 20 * 1024 * 1024, + response_size_maximum: 20 * 1024 * 1024, + } + } +} diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index bada6f4ddf..557895e95d 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -7,13 +7,14 @@ use std::{marker::PhantomData, sync::Arc}; use async_broadcast::{Receiver, Sender}; +use async_compatibility_layer::art::async_spawn; use async_lock::RwLock; #[cfg(async_executor_impl = "async-std")] use async_std::task::spawn_blocking; use async_trait::async_trait; use hotshot_task::task::TaskState; use hotshot_types::{ - consensus::{OuterConsensus, View}, + consensus::{Consensus, OuterConsensus, View}, data::{DaProposal, PackedBundle}, event::{Event, EventType}, message::{Proposal, UpgradeLock}, @@ -22,6 +23,7 @@ use hotshot_types::{ traits::{ block_contents::vid_commitment, election::Membership, + network::ConnectedNetwork, node_implementation::{NodeImplementation, NodeType, Versions}, signature_key::SignatureKey, storage::Storage, @@ -231,6 +233,42 @@ impl, V: Versions> DaTaskState { tracing::debug!("DA vote recv, Main Task {:?}", vote.view_number()); diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs index 6964e38cd7..abacf32ffd 100644 --- a/crates/task-impls/src/request.rs +++ b/crates/task-impls/src/request.rs @@ -180,9 +180,14 @@ impl> NetworkRequestState = self .da_membership @@ -205,7 +210,7 @@ impl> NetworkRequestState