From 20f38402ab3ceb5e6aa011513f32da9e88ed97f5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 20 Jan 2021 23:40:19 +0600 Subject: [PATCH] Allow to use framed write task for io flushing --- ntex-codec/src/framed.rs | 5 ++++- ntex/CHANGES.md | 4 ++++ ntex/Cargo.toml | 4 ++-- ntex/src/framed/read.rs | 2 ++ ntex/src/framed/state.rs | 30 +++++++++++++++++++++++++----- ntex/src/framed/write.rs | 24 +++++++++++++++++++++++- 6 files changed, 60 insertions(+), 9 deletions(-) diff --git a/ntex-codec/src/framed.rs b/ntex-codec/src/framed.rs index e541d5927..061417e77 100644 --- a/ntex-codec/src/framed.rs +++ b/ntex-codec/src/framed.rs @@ -240,7 +240,10 @@ where Poll::Pending => break, Poll::Ready(Ok(n)) => { if n == 0 { - log::trace!("Disconnected during flush, written {}", written); + log::trace!( + "Disconnected during flush, written {}", + written + ); self.flags.insert(Flags::DISCONNECTED); return Poll::Ready(Err(io::Error::new( io::ErrorKind::WriteZero, diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index b1e8ee457..968a5d643 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.0-b.3] - 2021-01-21 + +* Allow to use framed write task for io flushing + ## [0.2.0-b.2] - 2021-01-20 * Fix flush framed write task diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index bfb0fe655..3a439f918 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.2.0-b.2" +version = "0.2.0-b.3" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -36,7 +36,7 @@ compress = ["flate2", "brotli2"] cookie = ["coo-kie", "coo-kie/percent-encode"] [dependencies] -ntex-codec = "0.2.1" +ntex-codec = "0.2.2" ntex-rt = "0.1.1" ntex-rt-macros = "0.1" ntex-router = "0.3.8" diff --git a/ntex/src/framed/read.rs b/ntex/src/framed/read.rs index 29da43c3e..793441096 100644 --- a/ntex/src/framed/read.rs +++ b/ntex/src/framed/read.rs @@ -37,6 +37,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.state.is_io_shutdown() { + log::trace!("read task is instructed to shutdown"); Poll::Ready(()) } else if self.state.is_read_paused() { self.state.register_read_task(cx.waker()); @@ -50,6 +51,7 @@ where Poll::Pending } Err(err) => { + log::trace!("error during reading data: {:?}", err); self.state.set_io_error(err); Poll::Ready(()) } diff --git a/ntex/src/framed/state.rs b/ntex/src/framed/state.rs index 836c5ba5c..42c26296b 100644 --- a/ntex/src/framed/state.rs +++ b/ntex/src/framed/state.rs @@ -367,14 +367,25 @@ impl State { self.0.dispatch_task.register(waker); } - pub(crate) fn with_read_buf(&self, f: F) -> R + fn mark_io_error(&self) { + self.0.read_task.wake(); + self.0.write_task.wake(); + self.0.dispatch_task.wake(); + let mut flags = self.0.flags.get(); + flags.insert(Flags::IO_ERR | Flags::DSP_STOP); + self.0.flags.set(flags); + } + + #[inline] + pub fn with_read_buf(&self, f: F) -> R where F: FnOnce(&mut BytesMut) -> R, { f(&mut self.0.read_buf.borrow_mut()) } - pub(crate) fn with_write_buf(&self, f: F) -> R + #[inline] + pub fn with_write_buf(&self, f: F) -> R where F: FnOnce(&mut BytesMut) -> R, { @@ -448,7 +459,10 @@ where continue; } } - Err(err) => Err(Either::Left(err)), + Err(err) => { + self.mark_io_error(); + Err(Either::Left(err)) + } }; } } @@ -479,7 +493,10 @@ where continue; } } - Err(err) => Poll::Ready(Err(Either::Left(err))), + Err(err) => { + self.mark_io_error(); + Poll::Ready(Err(Either::Left(err))) + } }; } } @@ -502,7 +519,10 @@ where let st = self.0.clone(); poll_fn(|cx| flush(io, &mut st.write_buf.borrow_mut(), cx)) .await - .map_err(Either::Right) + .map_err(|e| { + self.mark_io_error(); + Either::Right(e) + }) } #[inline] diff --git a/ntex/src/framed/write.rs b/ntex/src/framed/write.rs index cc5714198..a76a1be46 100644 --- a/ntex/src/framed/write.rs +++ b/ntex/src/framed/write.rs @@ -48,6 +48,21 @@ where st: IoWriteState::Processing, } } + + /// Shutdown io stream + pub fn shutdown(io: Rc>, state: State) -> Self { + let disconnect_timeout = state.disconnect_timeout() as u64; + let st = IoWriteState::Shutdown( + if disconnect_timeout != 0 { + Some(delay_for(Duration::from_millis(disconnect_timeout))) + } else { + None + }, + Shutdown::None, + ); + + Self { io, st, state } + } } impl Future for FramedWriteTask @@ -96,7 +111,7 @@ where } } Poll::Ready(Err(err)) => { - log::trace!("error sending data: {:?}", err); + log::trace!("error during sending data: {:?}", err); this.state.set_io_error(Some(err)); return Poll::Ready(()); } @@ -124,6 +139,9 @@ where } Poll::Ready(Err(_)) => { this.state.set_wr_shutdown_complete(); + log::trace!( + "write task is closed with err during flush" + ); return Poll::Ready(()); } _ => (), @@ -139,6 +157,9 @@ where } Poll::Ready(Err(_)) => { this.state.set_wr_shutdown_complete(); + log::trace!( + "write task is closed with err during shutdown" + ); return Poll::Ready(()); } _ => (), @@ -152,6 +173,7 @@ where match Pin::new(&mut *io).poll_read(cx, &mut buf) { Poll::Ready(Ok(0)) | Poll::Ready(Err(_)) => { this.state.set_wr_shutdown_complete(); + log::trace!("write task is closed"); return Poll::Ready(()); } Poll::Pending => break,