Skip to content

Commit

Permalink
bias conn event loop toward incoming packets and I/O (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
carver authored Jun 27, 2023
1 parent 3bc0778 commit 4225d1b
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
tokio::pin!(idle_timeout);
loop {
tokio::select! {
biased;
Some(event) = stream_events.recv() => {
match event {
StreamEvent::Incoming(packet) => {
Expand All @@ -263,12 +264,6 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
}
}
}
Some(Ok(timeout)) = self.unacked.next() => {
let (seq, packet) = timeout;
tracing::debug!(seq, ack = %packet.ack_num(), packet = ?packet.packet_type(), "timeout");

self.on_timeout(packet, Instant::now());
}
Some(write) = writes.recv(), if !shutting_down => {
// Reset the idle timeout on any new write.
let idle_deadline = tokio::time::Instant::now() + self.config.max_idle_timeout;
Expand All @@ -285,6 +280,12 @@ impl<const N: usize, P: ConnectionPeer> Connection<N, P> {
_ = self.writable.notified() => {
self.process_writes(Instant::now());
}
Some(Ok(timeout)) = self.unacked.next() => {
let (seq, packet) = timeout;
tracing::debug!(seq, ack = %packet.ack_num(), packet = ?packet.packet_type(), "timeout");

self.on_timeout(packet, Instant::now());
}
() = &mut idle_timeout => {
if !std::matches!(self.state, State::Closed { .. }) {
let unacked: Vec<u16> = self.unacked.keys().copied().collect();
Expand Down

0 comments on commit 4225d1b

Please sign in to comment.