Skip to content

Commit

Permalink
Allow to use framed write task for io flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jan 20, 2021
1 parent 7153310 commit 20f3840
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 9 deletions.
5 changes: 4 additions & 1 deletion ntex-codec/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ where
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("Disconnected during flush, written {}", written);
log::trace!(
"Disconnected during flush, written {}",
written
);
self.flags.insert(Flags::DISCONNECTED);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
Expand Down
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.2.0-b.3] - 2021-01-21

* Allow to use framed write task for io flushing

## [0.2.0-b.2] - 2021-01-20

* Fix flush framed write task
Expand Down
4 changes: 2 additions & 2 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.2.0-b.2"
version = "0.2.0-b.3"
authors = ["ntex contributors <[email protected]>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down Expand Up @@ -36,7 +36,7 @@ compress = ["flate2", "brotli2"]
cookie = ["coo-kie", "coo-kie/percent-encode"]

[dependencies]
ntex-codec = "0.2.1"
ntex-codec = "0.2.2"
ntex-rt = "0.1.1"
ntex-rt-macros = "0.1"
ntex-router = "0.3.8"
Expand Down
2 changes: 2 additions & 0 deletions ntex/src/framed/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.state.is_io_shutdown() {
log::trace!("read task is instructed to shutdown");
Poll::Ready(())
} else if self.state.is_read_paused() {
self.state.register_read_task(cx.waker());
Expand All @@ -50,6 +51,7 @@ where
Poll::Pending
}
Err(err) => {
log::trace!("error during reading data: {:?}", err);
self.state.set_io_error(err);
Poll::Ready(())
}
Expand Down
30 changes: 25 additions & 5 deletions ntex/src/framed/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,25 @@ impl<U> State<U> {
self.0.dispatch_task.register(waker);
}

pub(crate) fn with_read_buf<F, R>(&self, f: F) -> R
fn mark_io_error(&self) {
self.0.read_task.wake();
self.0.write_task.wake();
self.0.dispatch_task.wake();
let mut flags = self.0.flags.get();
flags.insert(Flags::IO_ERR | Flags::DSP_STOP);
self.0.flags.set(flags);
}

#[inline]
pub fn with_read_buf<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
{
f(&mut self.0.read_buf.borrow_mut())
}

pub(crate) fn with_write_buf<F, R>(&self, f: F) -> R
#[inline]
pub fn with_write_buf<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
{
Expand Down Expand Up @@ -448,7 +459,10 @@ where
continue;
}
}
Err(err) => Err(Either::Left(err)),
Err(err) => {
self.mark_io_error();
Err(Either::Left(err))
}
};
}
}
Expand Down Expand Up @@ -479,7 +493,10 @@ where
continue;
}
}
Err(err) => Poll::Ready(Err(Either::Left(err))),
Err(err) => {
self.mark_io_error();
Poll::Ready(Err(Either::Left(err)))
}
};
}
}
Expand All @@ -502,7 +519,10 @@ where
let st = self.0.clone();
poll_fn(|cx| flush(io, &mut st.write_buf.borrow_mut(), cx))
.await
.map_err(Either::Right)
.map_err(|e| {
self.mark_io_error();
Either::Right(e)
})
}

#[inline]
Expand Down
24 changes: 23 additions & 1 deletion ntex/src/framed/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ where
st: IoWriteState::Processing,
}
}

/// Shutdown io stream
pub fn shutdown(io: Rc<RefCell<T>>, state: State<U>) -> Self {
let disconnect_timeout = state.disconnect_timeout() as u64;
let st = IoWriteState::Shutdown(
if disconnect_timeout != 0 {
Some(delay_for(Duration::from_millis(disconnect_timeout)))
} else {
None
},
Shutdown::None,
);

Self { io, st, state }
}
}

impl<T, U> Future for FramedWriteTask<T, U>
Expand Down Expand Up @@ -96,7 +111,7 @@ where
}
}
Poll::Ready(Err(err)) => {
log::trace!("error sending data: {:?}", err);
log::trace!("error during sending data: {:?}", err);
this.state.set_io_error(Some(err));
return Poll::Ready(());
}
Expand Down Expand Up @@ -124,6 +139,9 @@ where
}
Poll::Ready(Err(_)) => {
this.state.set_wr_shutdown_complete();
log::trace!(
"write task is closed with err during flush"
);
return Poll::Ready(());
}
_ => (),
Expand All @@ -139,6 +157,9 @@ where
}
Poll::Ready(Err(_)) => {
this.state.set_wr_shutdown_complete();
log::trace!(
"write task is closed with err during shutdown"
);
return Poll::Ready(());
}
_ => (),
Expand All @@ -152,6 +173,7 @@ where
match Pin::new(&mut *io).poll_read(cx, &mut buf) {
Poll::Ready(Ok(0)) | Poll::Ready(Err(_)) => {
this.state.set_wr_shutdown_complete();
log::trace!("write task is closed");
return Poll::Ready(());
}
Poll::Pending => break,
Expand Down

0 comments on commit 20f3840

Please sign in to comment.