Skip to content

Commit

Permalink
Libp2p & VID fixes (#3829)
Browse files Browse the repository at this point in the history
* re-introduce optimistic VID

* cbor fork

* add leader to request pool
  • Loading branch information
rob-maron authored Nov 1, 2024
1 parent 84ae1f8 commit 88b9961
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use hotshot_types::{
},
HotShotConfig, PeerConfig, ValidatorConfig,
};
use libp2p_networking::network::GossipConfig;
use libp2p_networking::network::{GossipConfig, RequestResponseConfig};
use rand::{rngs::StdRng, SeedableRng};
use surf_disco::Url;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -754,6 +754,7 @@ where
let libp2p_network = Libp2pNetwork::from_config(
config.clone(),
GossipConfig::default(),
RequestResponseConfig::default(),
bind_address,
&public_key,
&private_key,
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod implementations {
combined_network::{CombinedNetworks, UnderlyingCombinedNetworks},
libp2p_network::{
derive_libp2p_keypair, derive_libp2p_multiaddr, derive_libp2p_peer_id, GossipConfig,
Libp2pMetricsValue, Libp2pNetwork, PeerInfoVec,
Libp2pMetricsValue, Libp2pNetwork, PeerInfoVec, RequestResponseConfig,
},
memory_network::{MasterMap, MemoryNetwork},
push_cdn_network::{
Expand Down
4 changes: 3 additions & 1 deletion crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use libp2p_identity::{
ed25519::{self, SecretKey},
Keypair, PeerId,
};
pub use libp2p_networking::network::GossipConfig;
pub use libp2p_networking::network::{GossipConfig, RequestResponseConfig};
use libp2p_networking::{
network::{
behaviours::dht::record::{Namespace, RecordKey, RecordValue},
Expand Down Expand Up @@ -395,6 +395,7 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
pub async fn from_config(
mut config: NetworkConfig<K>,
gossip_config: GossipConfig,
request_response_config: RequestResponseConfig,
bind_address: Multiaddr,
pub_key: &K,
priv_key: &K::PrivateKey,
Expand All @@ -414,6 +415,7 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {

// Set the gossip configuration
config_builder.gossip_config(gossip_config.clone());
config_builder.request_response_config(request_response_config);

// Extrapolate the stake table from the known nodes
let stake_table: HashSet<K> = config
Expand Down
1 change: 1 addition & 0 deletions crates/libp2p-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void = "1"
lazy_static = { workspace = true }
pin-project = "1"
portpicker.workspace = true
cbor4ii = "0.3"

[target.'cfg(all(async_executor_impl = "tokio"))'.dependencies]
libp2p = { workspace = true, features = ["tokio"] }
Expand Down
145 changes: 145 additions & 0 deletions crates/libp2p-networking/src/network/cbor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use async_trait::async_trait;
use cbor4ii::core::error::DecodeError;
use futures::prelude::*;
use libp2p::{
request_response::{self, Codec},
StreamProtocol,
};
use serde::{de::DeserializeOwned, Serialize};
use std::{collections::TryReserveError, convert::Infallible, io, marker::PhantomData};

/// `Behaviour` type alias for the `Cbor` codec
pub type Behaviour<Req, Resp> = request_response::Behaviour<Cbor<Req, Resp>>;

/// Forked `cbor` codec with altered request/response sizes
pub struct Cbor<Req, Resp> {
/// Phantom data
phantom: PhantomData<(Req, Resp)>,
/// Maximum request size in bytes
request_size_maximum: u64,
/// Maximum response size in bytes
response_size_maximum: u64,
}

impl<Req, Resp> Default for Cbor<Req, Resp> {
fn default() -> Self {
Cbor {
phantom: PhantomData,
request_size_maximum: 20 * 1024 * 1024,
response_size_maximum: 20 * 1024 * 1024,
}
}
}

impl<Req, Resp> Cbor<Req, Resp> {
/// Create a new `Cbor` codec with the given request and response sizes
#[must_use]
pub fn new(request_size_maximum: u64, response_size_maximum: u64) -> Self {
Cbor {
phantom: PhantomData,
request_size_maximum,
response_size_maximum,
}
}
}

impl<Req, Resp> Clone for Cbor<Req, Resp> {
fn clone(&self) -> Self {
Self::default()
}
}

#[async_trait]
impl<Req, Resp> Codec for Cbor<Req, Resp>
where
Req: Send + Serialize + DeserializeOwned,
Resp: Send + Serialize + DeserializeOwned,
{
type Protocol = StreamProtocol;
type Request = Req;
type Response = Resp;

async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Req>
where
T: AsyncRead + Unpin + Send,
{
let mut vec = Vec::new();

io.take(self.request_size_maximum)
.read_to_end(&mut vec)
.await?;

cbor4ii::serde::from_slice(vec.as_slice()).map_err(decode_into_io_error)
}

async fn read_response<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Resp>
where
T: AsyncRead + Unpin + Send,
{
let mut vec = Vec::new();

io.take(self.response_size_maximum)
.read_to_end(&mut vec)
.await?;

cbor4ii::serde::from_slice(vec.as_slice()).map_err(decode_into_io_error)
}

async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let data: Vec<u8> =
cbor4ii::serde::to_vec(Vec::new(), &req).map_err(encode_into_io_error)?;

io.write_all(data.as_ref()).await?;

Ok(())
}

async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
resp: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let data: Vec<u8> =
cbor4ii::serde::to_vec(Vec::new(), &resp).map_err(encode_into_io_error)?;

io.write_all(data.as_ref()).await?;

Ok(())
}
}

/// Convert a `cbor4ii::serde::DecodeError` into an `io::Error`
fn decode_into_io_error(err: cbor4ii::serde::DecodeError<Infallible>) -> io::Error {
match err {
cbor4ii::serde::DecodeError::Core(DecodeError::Read(e)) => {
io::Error::new(io::ErrorKind::Other, e)
}
cbor4ii::serde::DecodeError::Core(e @ DecodeError::Unsupported { .. }) => {
io::Error::new(io::ErrorKind::Unsupported, e)
}
cbor4ii::serde::DecodeError::Core(e @ DecodeError::Eof { .. }) => {
io::Error::new(io::ErrorKind::UnexpectedEof, e)
}
cbor4ii::serde::DecodeError::Core(e) => io::Error::new(io::ErrorKind::InvalidData, e),
cbor4ii::serde::DecodeError::Custom(e) => {
io::Error::new(io::ErrorKind::Other, e.to_string())
}
}
}

/// Convert a `cbor4ii::serde::EncodeError` into an `io::Error`
fn encode_into_io_error(err: cbor4ii::serde::EncodeError<TryReserveError>) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}
8 changes: 4 additions & 4 deletions crates/libp2p-networking/src/network/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use libp2p::{
gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic},
identify::{Behaviour as IdentifyBehaviour, Event as IdentifyEvent},
kad::store::MemoryStore,
request_response::{cbor, OutboundRequestId, ResponseChannel},
request_response::{OutboundRequestId, ResponseChannel},
Multiaddr,
};
use libp2p_identity::PeerId;
use libp2p_swarm_derive::NetworkBehaviour;
use tracing::{debug, error};

use super::{behaviours::dht::store::ValidatedStore, NetworkEventInternal};
use super::{behaviours::dht::store::ValidatedStore, cbor, NetworkEventInternal};

/// Overarching network behaviour performing:
/// - network topology discovoery
Expand Down Expand Up @@ -45,7 +45,7 @@ pub struct NetworkDef<K: SignatureKey + 'static> {

/// purpose: directly messaging peer
#[debug(skip)]
pub direct_message: libp2p::request_response::cbor::Behaviour<Vec<u8>, Vec<u8>>,
pub direct_message: cbor::Behaviour<Vec<u8>, Vec<u8>>,

/// Auto NAT behaviour to determine if we are publically reachable and
/// by which address
Expand All @@ -60,7 +60,7 @@ impl<K: SignatureKey + 'static> NetworkDef<K> {
gossipsub: GossipBehaviour,
dht: libp2p::kad::Behaviour<ValidatedStore<MemoryStore, K>>,
identify: IdentifyBehaviour,
direct_message: cbor::Behaviour<Vec<u8>, Vec<u8>>,
direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>>,
autonat: autonat::Behaviour,
) -> NetworkDef<K> {
Self {
Expand Down
5 changes: 4 additions & 1 deletion crates/libp2p-networking/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ mod node;
/// Alternative Libp2p transport implementations
pub mod transport;

/// Forked `cbor` codec with altered request/response sizes
pub mod cbor;

use std::{collections::HashSet, fmt::Debug};

use futures::channel::oneshot::Sender;
Expand Down Expand Up @@ -44,7 +47,7 @@ pub use self::{
node::{
spawn_network_node, GossipConfig, NetworkNode, NetworkNodeConfig, NetworkNodeConfigBuilder,
NetworkNodeConfigBuilderError, NetworkNodeHandle, NetworkNodeReceiver,
DEFAULT_REPLICATION_FACTOR,
RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
},
};
#[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))]
Expand Down
18 changes: 13 additions & 5 deletions crates/libp2p-networking/src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use libp2p::{
identity::Keypair,
kad::{store::MemoryStore, Behaviour, Config, Mode, Record},
request_response::{
Behaviour as RequestResponse, Config as RequestResponseConfig, ProtocolSupport,
Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
},
swarm::SwarmEvent,
Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
Expand All @@ -53,7 +53,7 @@ use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
pub use self::{
config::{
GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeConfigBuilderError,
DEFAULT_REPLICATION_FACTOR,
RequestResponseConfig, DEFAULT_REPLICATION_FACTOR,
},
handle::{spawn_network_node, NetworkNodeHandle, NetworkNodeReceiver},
};
Expand All @@ -62,6 +62,7 @@ use super::{
bootstrap::{self, DHTBootstrapTask, InputEvent},
store::ValidatedStore,
},
cbor::Cbor,
gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent,
NetworkEventInternal,
};
Expand Down Expand Up @@ -269,10 +270,17 @@ impl<K: SignatureKey + 'static> NetworkNode<K> {
);
kadem.set_mode(Some(Mode::Server));

let rrconfig = RequestResponseConfig::default();
let rrconfig = Libp2pRequestResponseConfig::default();

let direct_message: libp2p::request_response::cbor::Behaviour<Vec<u8>, Vec<u8>> =
RequestResponse::new(
// Create a new `cbor` codec with the given request and response sizes
let cbor = Cbor::new(
config.request_response_config.request_size_maximum,
config.request_response_config.response_size_maximum,
);

let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
RequestResponse::with_codec(
cbor,
[(
StreamProtocol::new("/HotShot/direct_message/1.0"),
ProtocolSupport::Full,
Expand Down
22 changes: 22 additions & 0 deletions crates/libp2p-networking/src/network/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub struct NetworkNodeConfig<K: SignatureKey + 'static> {
/// Configuration for `GossipSub`
pub gossip_config: GossipConfig,

#[builder(default)]
/// Configuration for `RequestResponse`
pub request_response_config: RequestResponseConfig,

/// list of addresses to connect to at initialization
pub to_connect_addrs: HashSet<(PeerId, Multiaddr)>,
/// republication interval in DHT, must be much less than `ttl`
Expand Down Expand Up @@ -151,3 +155,21 @@ impl Default for GossipConfig {
}
}
}

/// Configuration for Libp2p's request-response
#[derive(Clone, Debug)]
pub struct RequestResponseConfig {
/// The maximum request size in bytes
pub request_size_maximum: u64,
/// The maximum response size in bytes
pub response_size_maximum: u64,
}

impl Default for RequestResponseConfig {
fn default() -> Self {
Self {
request_size_maximum: 20 * 1024 * 1024,
response_size_maximum: 20 * 1024 * 1024,
}
}
}
Loading

0 comments on commit 88b9961

Please sign in to comment.