Skip to content

Commit

Permalink
Remove limit on send buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
KolbyML committed May 23, 2023
1 parent a2b619d commit 3ff3a73
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 60 deletions.
18 changes: 7 additions & 11 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ enum State<const N: usize> {
Connecting(Option<oneshot::Sender<io::Result<()>>>),
Established {
recv_buf: ReceiveBuffer<N>,
send_buf: SendBuffer<N>,
send_buf: SendBuffer,
sent_packets: SentPackets,
},
Closing {
local_fin: Option<u16>,
remote_fin: Option<u16>,
recv_buf: ReceiveBuffer<N>,
send_buf: SendBuffer<N>,
send_buf: SendBuffer,
sent_packets: SentPackets,
},
Closed {
Expand Down Expand Up @@ -455,15 +455,11 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
}

// Write as much data as possible into send buffer.
while let Some((data, ..)) = self.pending_writes.front() {
if data.len() <= send_buf.available() {
let (data, tx) = self.pending_writes.pop_front().unwrap();
send_buf.write(&data).unwrap();
let _ = tx.send(Ok(data.len()));
self.writable.notify_one();
} else {
break;
}
while self.pending_writes.front().is_some() {
let (data, tx) = self.pending_writes.pop_front().unwrap();
send_buf.write(&data).unwrap();
let _ = tx.send(Ok(data.len()));
self.writable.notify_one();
}

// Transmit data packets.
Expand Down
57 changes: 8 additions & 49 deletions src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@ use std::io;
type Bytes = Vec<u8>;

#[derive(Clone, Debug)]
pub struct SendBuffer<const N: usize> {
pub struct SendBuffer {
pending: VecDeque<Bytes>,
offset: usize,
}

impl<const N: usize> Default for SendBuffer<N> {
impl Default for SendBuffer {
fn default() -> Self {
Self {
pending: VecDeque::new(),
offset: 0,
}
Self::new()
}
}

impl<const N: usize> SendBuffer<N> {
impl SendBuffer {
/// Creates a new buffer.
pub fn new() -> Self {
Self {
Expand All @@ -27,26 +24,15 @@ impl<const N: usize> SendBuffer<N> {
}
}

/// Returns the number of bytes available in the buffer.
pub fn available(&self) -> usize {
N - self.pending.iter().fold(0, |acc, x| acc + x.len()) + self.offset
}

/// Returns `true` if the buffer is empty.
pub fn is_empty(&self) -> bool {
self.pending.is_empty()
}

/// Writes `data` into the buffer, returning the number of bytes written.
pub fn write(&mut self, data: &[u8]) -> io::Result<usize> {
let available = self.available();
if data.len() <= available {
self.pending.push_back(data.to_vec());
Ok(data.len())
} else {
self.pending.push_back(data[..available].to_vec());
Ok(available)
}
self.pending.push_back(data.to_vec());
Ok(data.len())
}

/// Reads data from the buffer into `buf`, returning the number of bytes read.
Expand Down Expand Up @@ -82,37 +68,10 @@ mod test {

const SIZE: usize = 8192;

#[test]
fn available() {
let mut buf = SendBuffer::<SIZE>::new();
assert_eq!(buf.available(), SIZE);

const WRITE_LEN: usize = 512;
const NUM_WRITES: usize = 3;

const READ_LEN: usize = 64;

for _ in 0..NUM_WRITES {
let data = vec![0; WRITE_LEN];
buf.write(&data).unwrap();
}
assert_eq!(buf.available(), SIZE - (WRITE_LEN * NUM_WRITES));

let mut data = vec![0; READ_LEN];
buf.read(&mut data).unwrap();
assert_eq!(buf.available(), SIZE - (WRITE_LEN * NUM_WRITES) + READ_LEN);

for _ in 0..NUM_WRITES {
let mut data = vec![0; WRITE_LEN];
buf.read(&mut data).unwrap();
}
assert_eq!(buf.available(), SIZE);
}

#[test]
#[allow(clippy::read_zero_byte_vec)]
fn read() {
let mut buf = SendBuffer::<SIZE>::new();
let mut buf = SendBuffer::new();

// Read of empty buffer returns zero.
let mut read_buf = vec![0; SIZE];
Expand Down Expand Up @@ -154,7 +113,7 @@ mod test {

#[test]
fn write() {
let mut buf = SendBuffer::<SIZE>::new();
let mut buf = SendBuffer::new();

const WRITE_LEN: usize = 1024;

Expand Down

0 comments on commit 3ff3a73

Please sign in to comment.