Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve existing error handling #46

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async-trait = "0.1.64"
delay_map = "0.1.2"
futures = "0.3.26"
rand = "0.8.5"
thiserror = "1.0.40"
tokio = { version = "1.25.0", features = ["io-util", "rt-multi-thread", "macros", "net", "sync", "time"] }
tracing = { version = "0.1.37", features = ["std", "attributes", "log"] }

Expand Down
100 changes: 42 additions & 58 deletions src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::cmp;
use std::collections::VecDeque;
use std::fmt;
use std::io;
use std::time::{Duration, Instant};

use delay_map::HashMapDelay;
use futures::StreamExt;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, Notify};

use crate::cid::{ConnectionId, ConnectionPeer};
Expand All @@ -17,47 +17,32 @@ use crate::send::SendBuffer;
use crate::sent::SentPackets;
use crate::seq::CircularRangeInclusive;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum Error {
#[derive(Debug, Error)]
pub enum Error {
#[error("io error, {0}")]
Io(#[from] io::Error),
#[error("missing payload in DATA packet")]
EmptyDataPayload,
#[error("received ACK for unsent packet")]
InvalidAckNum,
#[error("received multiple FIN packets with distinct sequence numbers")]
InvalidFin,
#[error("received packet with sequence number outside of remote peer's [SYN,FIN] range")]
InvalidSeqNum,
#[error("received multiple SYN packets with distinct sequence numbers")]
InvalidSyn,
#[error("received RESET packet from remote peer")]
Reset,
#[error("received SYN packet from uTP connection acceptor")]
SynFromAcceptor,
TimedOut,
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
Self::EmptyDataPayload => "missing payload in DATA packet",
Self::InvalidAckNum => "received ACK for unsent packet",
Self::InvalidFin => "received multiple FIN packets with distinct sequence numbers",
Self::InvalidSeqNum => {
"received packet with sequence number outside of remote peer's [SYN,FIN] range"
}
Self::InvalidSyn => "received multiple SYN packets with distinct sequence numbers",
Self::Reset => "received RESET packet from remote peer",
Self::SynFromAcceptor => "received SYN packet from connection acceptor",
Self::TimedOut => "connection timed out",
};

write!(f, "{s}")
}
}

impl From<Error> for io::ErrorKind {
fn from(value: Error) -> Self {
use Error::*;
match value {
EmptyDataPayload | InvalidAckNum | InvalidFin | InvalidSeqNum | InvalidSyn
| SynFromAcceptor => io::ErrorKind::InvalidData,
Reset => io::ErrorKind::ConnectionReset,
TimedOut => io::ErrorKind::TimedOut,
}
}
#[error("uTP connection timed out, {0}")]
ConnTimeOut(#[from] oneshot::error::RecvError),
#[error("idle uTP connection time out")]
ConnIdleTimeOut,
#[error("uTP connection closed")]
ConnClosed(String),
#[error("max uTP connection attempts made")]
MaxConnAttempts,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand All @@ -67,7 +52,7 @@ enum Endpoint {
}

enum State<const N: usize> {
Connecting(Option<oneshot::Sender<io::Result<()>>>),
Connecting(Option<oneshot::Sender<Result<(), Error>>>),
Established {
recv_buf: ReceiveBuffer<N>,
send_buf: SendBuffer<N>,
Expand All @@ -85,8 +70,8 @@ enum State<const N: usize> {
},
}

pub type Write = (Vec<u8>, oneshot::Sender<io::Result<usize>>);
pub type Read = (usize, oneshot::Sender<io::Result<Vec<u8>>>);
pub type Write = (Vec<u8>, oneshot::Sender<Result<usize, Error>>);
pub type Read = (usize, oneshot::Sender<Result<Vec<u8>, Error>>);

#[derive(Clone, Copy, Debug)]
pub struct ConnectionConfig {
Expand Down Expand Up @@ -143,7 +128,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
cid: ConnectionId<P>,
config: ConnectionConfig,
syn: Option<Packet>,
connected: oneshot::Sender<io::Result<()>>,
connected: oneshot::Sender<Result<(), Error>>,
socket_events: mpsc::UnboundedSender<SocketEvent<P>>,
) -> Self {
let (endpoint, peer_ts_diff, peer_recv_window) = match syn {
Expand Down Expand Up @@ -284,7 +269,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
let unacked: Vec<u16> = self.unacked.keys().copied().collect();
tracing::warn!(?unacked, "idle timeout expired, closing...");

self.state = State::Closed { err: Some(Error::TimedOut) };
self.state = State::Closed { err: Some(Error::ConnIdleTimeOut) };
}
}
_ = &mut shutdown, if !shutting_down => {
Expand All @@ -297,7 +282,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
self.shutdown();
}

if let State::Closed { err } = self.state {
if let State::Closed { ref err } = self.state {
tracing::debug!(?err, "uTP conn closing...");

self.process_reads();
Expand Down Expand Up @@ -424,13 +409,13 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
..
} => (send_buf, sent_packets, recv_buf),
State::Closed { err, .. } => {
let result = match err {
Some(err) => Err(io::ErrorKind::from(*err)),
None => Ok(0),
};
while let Some(pending) = self.pending_writes.pop_front() {
let result = match err {
Some(err) => Err(Error::ConnClosed(format_args!("{err}").to_string())),
None => Ok(0),
};
let (.., tx) = pending;
let _ = tx.send(result.map_err(io::Error::from));
let _ = tx.send(result);
}
return;
}
Expand Down Expand Up @@ -517,7 +502,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
}
State::Closed { err, .. } => {
let result = match err {
Some(err) => Err(io::Error::from(io::ErrorKind::from(*err))),
Some(err) => Err(Error::ConnClosed(format_args!("{err}").to_string())),
None => Ok(0),
};
let _ = tx.send(result);
Expand All @@ -534,13 +519,13 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
State::Connecting(..) => return,
State::Established { recv_buf, .. } | State::Closing { recv_buf, .. } => recv_buf,
State::Closed { err } => {
let result = match err {
Some(err) => Err(io::ErrorKind::from(*err)),
None => Ok(vec![]),
};
while let Some(pending) = self.pending_reads.pop_front() {
let result = match err {
Some(err) => Err(Error::ConnClosed(format_args!("{err}").to_string())),
None => Ok(vec![]),
};
let (.., tx) = pending;
let _ = tx.send(result.clone().map_err(io::Error::from));
let _ = tx.send(result);
}
return;
}
Expand Down Expand Up @@ -574,7 +559,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
}
State::Closed { err } => {
let result = match err {
Some(err) => Err(io::Error::from(io::ErrorKind::from(*err))),
Some(err) => Err(Error::ConnClosed(format_args!("{err}").to_string())),
None => Ok(vec![]),
};
let _ = tx.send(result);
Expand Down Expand Up @@ -606,12 +591,12 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
State::Connecting(connected) => match &mut self.endpoint {
Endpoint::Initiator((syn, attempts)) => {
if *attempts >= self.config.max_conn_attempts {
let err = Error::TimedOut;
if let Some(connected) = connected.take() {
let err = io::Error::from(io::ErrorKind::from(err));
let _ = connected.send(Err(err));
let _ = connected.send(Err(Error::MaxConnAttempts));
}
self.state = State::Closed { err: Some(err) };
self.state = State::Closed {
err: Some(Error::MaxConnAttempts),
};
} else {
let seq = *syn;
*attempts += 1;
Expand Down Expand Up @@ -1095,7 +1080,6 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
#[cfg(test)]
mod test {
use super::*;

use std::net::SocketAddr;

const BUF: usize = 2048;
Expand Down
97 changes: 51 additions & 46 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,44 @@ use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex, RwLock};

use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot};

use crate::cid::{ConnectionId, ConnectionIdGenerator, ConnectionPeer, StdConnectionIdGenerator};
use crate::conn;
use crate::conn::ConnectionConfig;
use crate::event::{SocketEvent, StreamEvent};
use crate::packet::{Packet, PacketType};
use crate::stream;
use crate::stream::UtpStream;
use crate::udp::AsyncUdpSocket;
use thiserror::Error;
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot};

type ConnChannel = mpsc::UnboundedSender<StreamEvent>;

#[derive(Debug, Error)]
pub enum Error {
#[error("io error, {0}")]
Io(#[from] io::Error),
#[error("uTP stream error, {0}")]
Stream(#[from] stream::Error),
#[error("no uTP stream")]
NoStream,
#[error("idle timeout, uTP connection abort")]
ConnectionAbort,
#[error("opening uTP connection failed, {0}")]
ConnInit(#[from] conn::Error),
#[error("nonexistent connection id")]
NonExistentConnId,
#[error("notify channel failed, {0}")]
NotifyChannelRecv(#[from] oneshot::error::RecvError),
#[error("sending on accept connection channel failed")]
AcceptChannelSend,
}

pub struct SocketError(io::Error);

struct Accept<P> {
stream: oneshot::Sender<io::Result<UtpStream<P>>>,
stream: oneshot::Sender<Result<UtpStream<P>, Error>>,
config: ConnectionConfig,
}

Expand All @@ -30,7 +54,7 @@ pub struct UtpSocket<P> {
}

impl UtpSocket<SocketAddr> {
pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
pub async fn bind(addr: SocketAddr) -> Result<Self, Error> {
let socket = UdpSocket::bind(addr).await?;
let socket = Self::with_socket(socket);
Ok(socket)
Expand Down Expand Up @@ -208,26 +232,23 @@ where
self.cid_gen.lock().unwrap().cid(peer, is_initiator)
}

pub async fn accept(&self, config: ConnectionConfig) -> io::Result<UtpStream<P>> {
pub async fn accept(&self, config: ConnectionConfig) -> Result<UtpStream<P>, Error> {
let (stream_tx, stream_rx) = oneshot::channel();
let accept = Accept {
stream: stream_tx,
config,
};
self.accepts
.send((accept, None))
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))?;
match stream_rx.await {
Ok(stream) => Ok(stream?),
Err(..) => Err(io::Error::from(io::ErrorKind::TimedOut)),
}
.map_err(|_| Error::AcceptChannelSend)?;
stream_rx.await?
}

pub async fn accept_with_cid(
&self,
cid: ConnectionId<P>,
config: ConnectionConfig,
) -> io::Result<UtpStream<P>> {
) -> Result<UtpStream<P>, Error> {
let (stream_tx, stream_rx) = oneshot::channel();
let accept = Accept {
stream: stream_tx,
Expand All @@ -236,13 +257,10 @@ where
self.accepts
.send((accept, Some(cid)))
.map_err(|_| io::Error::from(io::ErrorKind::NotConnected))?;
match stream_rx.await {
Ok(stream) => Ok(stream?),
Err(..) => Err(io::Error::from(io::ErrorKind::TimedOut)),
}
stream_rx.await?
}

pub async fn connect(&self, peer: P, config: ConnectionConfig) -> io::Result<UtpStream<P>> {
pub async fn connect(&self, peer: P, config: ConnectionConfig) -> Result<UtpStream<P>, Error> {
let cid = self.cid_gen.lock().unwrap().cid(peer, true);
let (connected_tx, connected_rx) = oneshot::channel();
let (events_tx, events_rx) = mpsc::unbounded_channel();
Expand All @@ -260,26 +278,22 @@ where
connected_tx,
);

match connected_rx.await {
Ok(Ok(..)) => Ok(stream),
Ok(Err(err)) => Err(err),
Err(..) => Err(io::Error::from(io::ErrorKind::TimedOut)),
match connected_rx.await? {
Ok(..) => Ok(stream),
Err(err) => Err(err.into()),
}
}

pub async fn connect_with_cid(
&self,
cid: ConnectionId<P>,
config: ConnectionConfig,
) -> io::Result<UtpStream<P>> {
) -> Result<UtpStream<P>, Error> {
if self.conns.read().unwrap().contains_key(&cid) {
return Err(io::Error::new(
io::ErrorKind::Other,
"connection ID unavailable".to_string(),
));
return Err(Error::NonExistentConnId);
}

let (connected_tx, connected_rx) = oneshot::channel();
let (init_stream_tx, init_stream_rx) = oneshot::channel();
let (events_tx, events_rx) = mpsc::unbounded_channel();

{
Expand All @@ -292,34 +306,25 @@ where
None,
self.socket_events.clone(),
events_rx,
connected_tx,
init_stream_tx,
);

match connected_rx.await {
Ok(Ok(..)) => Ok(stream),
Ok(Err(err)) => Err(err),
Err(..) => Err(io::Error::from(io::ErrorKind::TimedOut)),
match init_stream_rx.await? {
Ok(..) => Ok(stream),
Err(err) => Err(err.into()),
}
}

async fn await_connected(
stream: UtpStream<P>,
accept: Accept<P>,
connected: oneshot::Receiver<io::Result<()>>,
connected: oneshot::Receiver<Result<(), conn::Error>>,
) {
match connected.await {
Ok(Ok(..)) => {
let _ = accept.stream.send(Ok(stream));
}
Ok(Err(err)) => {
let _ = accept.stream.send(Err(err));
}
Err(..) => {
let _ = accept
.stream
.send(Err(io::Error::from(io::ErrorKind::ConnectionAborted)));
}
}
_ = match connected.await {
Ok(Ok(..)) => accept.stream.send(Ok(stream)),
Ok(Err(err)) => accept.stream.send(Err(err.into())),
Err(..) => accept.stream.send(Err(Error::ConnectionAbort)),
};
}
}

Expand Down
Loading