Skip to content

Commit

Permalink
Stop write task if io is closed (#416)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Sep 7, 2024
1 parent 3edb54f commit db6d3a6
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 25 deletions.
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "2.3.1"
version = "2.4.0"
authors = ["ntex contributors <[email protected]>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand Down
35 changes: 18 additions & 17 deletions ntex-io/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,26 @@ impl Filter for Base {

if flags.contains(Flags::IO_STOPPED) {
Poll::Ready(WriteStatus::Terminate)
} else if flags.intersects(Flags::IO_STOPPING) {
Poll::Ready(WriteStatus::Shutdown(
self.0 .0.disconnect_timeout.get().into(),
))
} else if flags.contains(Flags::IO_STOPPING_FILTERS)
&& !flags.contains(Flags::IO_FILTERS_TIMEOUT)
{
flags.insert(Flags::IO_FILTERS_TIMEOUT);
self.0.set_flags(flags);
self.0 .0.write_task.register(cx.waker());
Poll::Ready(WriteStatus::Timeout(
self.0 .0.disconnect_timeout.get().into(),
))
} else if flags.intersects(Flags::WR_PAUSED) {
self.0 .0.write_task.register(cx.waker());
Poll::Pending
} else {
self.0 .0.write_task.register(cx.waker());
Poll::Ready(WriteStatus::Ready)

if flags.intersects(Flags::IO_STOPPING) {
Poll::Ready(WriteStatus::Shutdown(
self.0 .0.disconnect_timeout.get().into(),
))
} else if flags.contains(Flags::IO_STOPPING_FILTERS)
&& !flags.contains(Flags::IO_FILTERS_TIMEOUT)
{
flags.insert(Flags::IO_FILTERS_TIMEOUT);
self.0.set_flags(flags);
Poll::Ready(WriteStatus::Timeout(
self.0 .0.disconnect_timeout.get().into(),
))
} else if flags.intersects(Flags::WR_PAUSED) {
Poll::Pending
} else {
Poll::Ready(WriteStatus::Ready)
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions ntex-io/src/ioref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl IoRef {
.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED)
}

pub(crate) fn is_io_closed(&self) -> bool {
self.0.flags.get().intersects(Flags::IO_STOPPED)
}

#[inline]
/// Check if write back-pressure is enabled
pub fn is_wr_backpressure(&self) -> bool {
Expand Down
11 changes: 11 additions & 0 deletions ntex-io/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,17 @@ impl WriteContext {
self.0.filter().poll_write_ready(cx)
}

#[inline]
/// Check if io is closed
pub fn poll_close(&self, cx: &mut Context<'_>) -> Poll<()> {
if self.0.is_io_closed() {
Poll::Ready(())
} else {
self.0 .0.write_task.register(cx.waker());
Poll::Pending
}
}

/// Get write buffer
pub fn with_buf<F>(&self, f: F) -> Poll<io::Result<()>>
where
Expand Down
4 changes: 2 additions & 2 deletions ntex-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ async-std = ["ntex-rt/async-std", "ntex-async-std"]
ntex-service = "3"
ntex-bytes = "0.1"
ntex-http = "0.1"
ntex-io = "2.3"
ntex-io = "2.4"
ntex-rt = "0.4.14"
ntex-util = "2"

ntex-tokio = { version = "0.5", optional = true }
ntex-tokio = { version = "0.5.1", optional = true }
ntex-compio = { version = "0.1", optional = true }
ntex-glommio = { version = "0.5", optional = true }
ntex-async-std = { version = "0.5", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions ntex-tokio/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.5.1] - 2024-09-06

* Stop write task if io is closed

## [0.4.0] - 2024-01-09

* Log io tags
Expand Down
6 changes: 3 additions & 3 deletions ntex-tokio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-tokio"
version = "0.5.0"
version = "0.5.1"
authors = ["ntex contributors <[email protected]>"]
description = "tokio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -17,7 +17,7 @@ path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-util = "2.0"
ntex-io = "2.4"
ntex-util = "2"
log = "0.4"
tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] }
14 changes: 14 additions & 0 deletions ntex-tokio/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ impl Future for WriteTask {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();

if this.state.poll_close(cx).is_ready() {
return Poll::Ready(());
}

match this.st {
IoWriteState::Processing(ref mut delay) => {
match ready!(this.state.poll_ready(cx)) {
Expand Down Expand Up @@ -215,6 +219,9 @@ impl Future for WriteTask {
// close WRITE side and wait for disconnect on read side.
// use disconnect timeout, otherwise it could hang forever.
loop {
if this.state.poll_close(cx).is_ready() {
return Poll::Ready(());
}
match st {
Shutdown::None => {
// flush write buffer
Expand Down Expand Up @@ -564,6 +571,10 @@ mod unixstream {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();

if this.state.poll_close(cx).is_ready() {
return Poll::Ready(());
}

match this.st {
IoWriteState::Processing(ref mut delay) => {
match this.state.poll_ready(cx) {
Expand Down Expand Up @@ -630,6 +641,9 @@ mod unixstream {
// close WRITE side and wait for disconnect on read side.
// use disconnect timeout, otherwise it could hang forever.
loop {
if this.state.poll_close(cx).is_ready() {
return Poll::Ready(());
}
match st {
Shutdown::None => {
// flush write buffer
Expand Down
2 changes: 1 addition & 1 deletion ntex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ntex-bytes = "0.1.27"
ntex-server = "2.3"
ntex-h2 = "1.1"
ntex-rt = "0.4.15"
ntex-io = "2.3"
ntex-io = "2.4"
ntex-net = "2.1"
ntex-tls = "2.1"

Expand Down
2 changes: 1 addition & 1 deletion ntex/src/http/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ where
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
/// By default disconnect timeout is set to 1 seconds.
pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self {
self.config.disconnect_timeout(timeout);
self
Expand Down

0 comments on commit db6d3a6

Please sign in to comment.