diff --git a/src/conn.rs b/src/conn.rs index a90fd8e..e2b7e04 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -16,9 +16,11 @@ use crate::recv::ReceiveBuffer; use crate::send::SendBuffer; use crate::sent::SentPackets; use crate::seq::CircularRangeInclusive; +use crate::stream::BUFFER_CAPACITY; #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum Error { + BufferCapacityExceeded, EmptyDataPayload, InvalidAckNum, InvalidFin, @@ -32,6 +34,7 @@ enum Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { + Self::BufferCapacityExceeded => "data written exceeds buffer capacity", Self::EmptyDataPayload => "missing payload in DATA packet", Self::InvalidAckNum => "received ACK for unsent packet", Self::InvalidFin => "received multiple FIN packets with distinct sequence numbers", @@ -52,8 +55,8 @@ impl From for io::ErrorKind { fn from(value: Error) -> Self { use Error::*; match value { - EmptyDataPayload | InvalidAckNum | InvalidFin | InvalidSeqNum | InvalidSyn - | SynFromAcceptor => io::ErrorKind::InvalidData, + BufferCapacityExceeded | EmptyDataPayload | InvalidAckNum | InvalidFin + | InvalidSeqNum | InvalidSyn | SynFromAcceptor => io::ErrorKind::InvalidData, Reset => io::ErrorKind::ConnectionReset, TimedOut => io::ErrorKind::TimedOut, } @@ -461,6 +464,12 @@ impl Connection { send_buf.write(&data).unwrap(); let _ = tx.send(Ok(data.len())); self.writable.notify_one(); + } else if data.len() > BUFFER_CAPACITY { + // TODO: refactor SendBuffer so we have 1 send buffer instead of 2 + let (_, tx) = self.pending_writes.pop_front().unwrap(); + let _ = tx.send(Err(io::Error::from(io::ErrorKind::from( + Error::BufferCapacityExceeded, + )))); } else { break; } diff --git a/src/stream.rs b/src/stream.rs index 1c331fd..4c38000 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -10,7 +10,7 @@ use crate::packet::Packet; /// The size of the send and receive buffers. // TODO: Make the buffer size configurable. -const BUF: usize = 1024 * 1024; +pub const BUFFER_CAPACITY: usize = 1024 * 1024; pub struct UtpStream

{ cid: ConnectionId

, @@ -31,8 +31,13 @@ where stream_events: mpsc::UnboundedReceiver, connected: oneshot::Sender>, ) -> Self { - let mut conn = - conn::Connection::::new(cid.clone(), config, syn, connected, socket_events); + let mut conn = conn::Connection::::new( + cid.clone(), + config, + syn, + connected, + socket_events, + ); let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (reads_tx, reads_rx) = mpsc::unbounded_channel();