Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peel off a ConnectionConfig value into SocketConfig #92

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::recv::ReceiveBuffer;
use crate::send::SendBuffer;
use crate::sent::SentPackets;
use crate::seq::CircularRangeInclusive;
use crate::socket;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum Error {
Expand Down Expand Up @@ -90,7 +91,6 @@ pub type Read = (usize, oneshot::Sender<io::Result<Vec<u8>>>);

#[derive(Clone, Copy, Debug)]
pub struct ConnectionConfig {
pub max_packet_size: u16,
pub max_conn_attempts: usize,
pub max_idle_timeout: Duration,
pub initial_timeout: Duration,
Expand All @@ -105,7 +105,6 @@ impl Default for ConnectionConfig {
Self {
max_conn_attempts: 3,
max_idle_timeout,
max_packet_size: congestion::DEFAULT_MAX_PACKET_SIZE_BYTES as u16,
initial_timeout: congestion::DEFAULT_INITIAL_TIMEOUT,
min_timeout: congestion::DEFAULT_MIN_TIMEOUT,
max_timeout: max_idle_timeout,
Expand All @@ -117,7 +116,6 @@ impl Default for ConnectionConfig {
impl From<ConnectionConfig> for congestion::Config {
fn from(value: ConnectionConfig) -> Self {
Self {
max_packet_size_bytes: u32::from(value.max_packet_size),
initial_timeout: value.initial_timeout,
min_timeout: value.min_timeout,
max_timeout: value.max_timeout,
Expand All @@ -131,6 +129,7 @@ pub struct Connection<const N: usize, P> {
state: State<N>,
cid: ConnectionId<P>,
config: ConnectionConfig,
socket_config: socket::SocketConfig,
endpoint: Endpoint,
peer_ts_diff: Duration,
peer_recv_window: u32,
Expand All @@ -147,6 +146,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
pub fn new(
cid: ConnectionId<P>,
config: ConnectionConfig,
socket_config: socket::SocketConfig,
syn: Option<Packet>,
connected: oneshot::Sender<io::Result<()>>,
socket_events: mpsc::UnboundedSender<SocketEvent<P>>,
Expand All @@ -172,6 +172,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
state: State::Connecting(Some(connected)),
cid,
config,
socket_config,
endpoint,
peer_ts_diff,
peer_recv_window,
Expand Down Expand Up @@ -216,7 +217,9 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
let recv_buf = ReceiveBuffer::new(syn);
let send_buf = SendBuffer::new();

let congestion_ctrl = congestion::Controller::new(self.config.into());
let mut congestion_config: congestion::Config = self.config.into();
congestion_config.max_packet_size_bytes = u32::from(self.socket_config.max_packet_size);
let congestion_ctrl = congestion::Controller::new(congestion_config);

// NOTE: We initialize with the sequence number of the SYN-ACK minus 1 because the
// SYN-ACK contains the incremented sequence number (i.e. the next sequence
Expand Down Expand Up @@ -448,7 +451,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
let mut payloads = Vec::new();
while window > 0 {
// TODO: Do not rely on approximation here. Account for header and extensions.
let max_data_size = cmp::min(window, usize::from(self.config.max_packet_size - 64));
let max_data_size = cmp::min(window, usize::from(self.socket_config.max_packet_size - 64));
let mut data = vec![0; max_data_size];
let n = send_buf.read(&mut data).unwrap();
if n == 0 {
Expand Down Expand Up @@ -1136,6 +1139,7 @@ mod test {
state: State::Connecting(Some(connected)),
cid,
config: ConnectionConfig::default(),
socket_config: socket::SocketConfig::default(),
endpoint,
peer_ts_diff: Duration::from_millis(100),
peer_recv_window: u32::MAX,
Expand Down
29 changes: 27 additions & 2 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::sync::{mpsc, oneshot};

use crate::cid::{ConnectionId, ConnectionIdGenerator, ConnectionPeer, StdConnectionIdGenerator};
use crate::conn::ConnectionConfig;
use crate::congestion;
use crate::event::{SocketEvent, StreamEvent};
use crate::packet::{Packet, PacketType};
use crate::stream::UtpStream;
Expand All @@ -22,7 +23,21 @@ struct Accept<P> {

const MAX_UDP_PAYLOAD_SIZE: usize = u16::MAX as usize;

#[derive(Clone, Copy, Debug)]
pub struct SocketConfig {
pub max_packet_size: u16,
}

impl Default for SocketConfig {
fn default() -> Self {
Self {
max_packet_size: congestion::DEFAULT_MAX_PACKET_SIZE_BYTES as u16,
}
}
}

pub struct UtpSocket<P> {
pub config: SocketConfig,
conns: Arc<RwLock<HashMap<ConnectionId<P>, ConnChannel>>>,
cid_gen: Mutex<StdConnectionIdGenerator<P>>,
accepts: mpsc::UnboundedSender<(Accept<P>, Option<ConnectionId<P>>)>,
Expand All @@ -31,8 +46,12 @@ pub struct UtpSocket<P> {

impl UtpSocket<SocketAddr> {
pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
Self::bind_with_config(addr, SocketConfig::default()).await
}

pub async fn bind_with_config(addr: SocketAddr, config: SocketConfig) -> io::Result<Self> {
let socket = UdpSocket::bind(addr).await?;
let socket = Self::with_socket(socket);
let socket = Self::with_socket(socket, config);
Ok(socket)
}
}
Expand All @@ -41,7 +60,7 @@ impl<P> UtpSocket<P>
where
P: ConnectionPeer + 'static,
{
pub fn with_socket<S>(socket: S) -> Self
pub fn with_socket<S>(socket: S, config: SocketConfig) -> Self
where
S: AsyncUdpSocket<P> + 'static,
{
Expand All @@ -59,13 +78,15 @@ where
let (accepts_tx, mut accepts_rx) = mpsc::unbounded_channel();

let utp = Self {
config,
conns: Arc::clone(&conns),
cid_gen,
accepts: accepts_tx,
socket_events: socket_event_tx.clone(),
};

let socket = Arc::new(socket);
let socket_config = utp.config;
tokio::spawn(async move {
let mut buf = [0; MAX_UDP_PAYLOAD_SIZE];
loop {
Expand Down Expand Up @@ -107,6 +128,7 @@ where
let stream = UtpStream::new(
cid,
accept.config,
socket_config,
Some(packet),
socket_event_tx.clone(),
events_rx,
Expand Down Expand Up @@ -167,6 +189,7 @@ where
let stream = UtpStream::new(
cid,
accept.config,
socket_config,
Some(syn),
socket_event_tx.clone(),
events_rx,
Expand Down Expand Up @@ -255,6 +278,7 @@ where
let stream = UtpStream::new(
cid,
config,
self.config,
None,
self.socket_events.clone(),
events_rx,
Expand Down Expand Up @@ -290,6 +314,7 @@ where
let stream = UtpStream::new(
cid,
config,
self.config,
None,
self.socket_events.clone(),
events_rx,
Expand Down
6 changes: 4 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::cid::{ConnectionId, ConnectionPeer};
use crate::conn;
use crate::event::{SocketEvent, StreamEvent};
use crate::packet::Packet;
use crate::socket;

/// The size of the send and receive buffers.
// TODO: Make the buffer size configurable.
Expand All @@ -25,14 +26,15 @@ where
{
pub(crate) fn new(
cid: ConnectionId<P>,
config: conn::ConnectionConfig,
conn_config: conn::ConnectionConfig,
socket_config: socket::SocketConfig,
syn: Option<Packet>,
socket_events: mpsc::UnboundedSender<SocketEvent<P>>,
stream_events: mpsc::UnboundedReceiver<StreamEvent>,
connected: oneshot::Sender<io::Result<()>>,
) -> Self {
let mut conn =
conn::Connection::<BUF, P>::new(cid.clone(), config, syn, connected, socket_events);
conn::Connection::<BUF, P>::new(cid.clone(), conn_config, socket_config, syn, connected, socket_events);

let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (reads_tx, reads_rx) = mpsc::unbounded_channel();
Expand Down