Skip to content

Commit

Permalink
refactor framed api
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Mar 15, 2021
1 parent 8b7f11c commit 295286d
Show file tree
Hide file tree
Showing 13 changed files with 614 additions and 375 deletions.
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.9] - 2021-03-15

* framed: refactor api

## [0.3.8] - 2021-03-11

* http: fix expect/continue support, wake up write task
Expand Down
2 changes: 1 addition & 1 deletion ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.3.8"
version = "0.3.9"
authors = ["ntex contributors <[email protected]>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion ntex/src/connect/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl<T: Address> TcpConnectorResponse<T> {

fn can_continue(&self, err: &io::Error) -> bool {
trace!(
"TCP connector - failed to connect to connecting to {:?} port: {} err: {:?}",
"TCP connector - failed to connect to {:?} port: {} err: {:?}",
self.req.as_ref().unwrap().host(),
self.port,
err
Expand Down
83 changes: 48 additions & 35 deletions ntex/src/framed/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use either::Either;
use futures::{ready, Future, FutureExt};

use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use crate::framed::{DispatchItem, ReadTask, State, Timer, WriteTask};
use crate::framed::{DispatchItem, Read, ReadTask, State, Timer, Write, WriteTask};
use crate::service::{IntoService, Service};

type Response<U> = <U as Encoder>::Item;
Expand Down Expand Up @@ -184,14 +184,14 @@ where
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
{
fn handle_result(&self, item: Result<S::Response, S::Error>, state: &State) {
fn handle_result(&self, item: Result<S::Response, S::Error>, write: Write<'_>) {
self.inflight.set(self.inflight.get() - 1);
match state.write_result(item, &self.codec) {
match write.encode_result(item, &self.codec) {
Ok(true) => (),
Ok(false) => state.enable_write_backpressure(),
Ok(false) => write.enable_backpressure(None),
Err(err) => self.error.set(Some(err.into())),
}
state.dsp_wake_task();
write.wake_dispatcher();
}
}

Expand All @@ -207,6 +207,8 @@ where
let mut this = self.as_mut().project();
let slf = &this.inner;
let state = &slf.state;
let read = state.read();
let write = state.write();

// handle service response future
if let Some(fut) = this.fut.as_mut().as_pin_mut() {
Expand All @@ -215,31 +217,31 @@ where
Poll::Ready(item) => {
this.fut.set(None);
slf.shared.inflight.set(slf.shared.inflight.get() - 1);
slf.handle_result(item);
slf.handle_result(item, write);
}
}
}

loop {
match slf.st.get() {
DispatcherState::Processing => {
let item = match ready!(slf.poll_service(&this.service, cx)) {
let item = match ready!(slf.poll_service(&this.service, cx, read)) {
PollService::Ready => {
if !state.is_write_ready() {
if !write.is_ready() {
// instruct write task to notify dispatcher when data is flushed
state.dsp_enable_write_backpressure(cx.waker());
write.enable_backpressure(Some(cx.waker()));
slf.st.set(DispatcherState::Backpressure);
DispatchItem::WBackPressureEnabled
} else if state.is_read_ready() {
} else if read.is_ready() {
// decode incoming bytes if buffer is ready
match state.decode_item(&slf.shared.codec) {
match read.decode(&slf.shared.codec) {
Ok(Some(el)) => {
slf.update_keepalive();
DispatchItem::Item(el)
}
Ok(None) => {
log::trace!("not enough data to decode next frame, register dispatch task");
state.dsp_read_more_data(cx.waker());
read.wake(cx.waker());
return Poll::Pending;
}
Err(err) => {
Expand All @@ -250,7 +252,7 @@ where
}
} else {
// no new events
state.dsp_register_task(cx.waker());
state.register_dispatcher(cx.waker());
return Poll::Pending;
}
}
Expand All @@ -265,7 +267,7 @@ where
match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(res) => {
this.fut.set(None);
slf.handle_result(res);
slf.handle_result(res, write);
}
Poll::Pending => {
slf.shared.inflight.set(slf.shared.inflight.get() + 1)
Expand All @@ -277,9 +279,9 @@ where
}
// handle write back-pressure
DispatcherState::Backpressure => {
let item = match ready!(slf.poll_service(&this.service, cx)) {
let item = match ready!(slf.poll_service(&this.service, cx, read)) {
PollService::Ready => {
if state.is_write_ready() {
if write.is_ready() {
slf.st.set(DispatcherState::Processing);
DispatchItem::WBackPressureDisabled
} else {
Expand All @@ -297,7 +299,7 @@ where
match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(res) => {
this.fut.set(None);
slf.handle_result(res);
slf.handle_result(res, write);
}
Poll::Pending => {
slf.shared.inflight.set(slf.shared.inflight.get() + 1)
Expand All @@ -316,7 +318,7 @@ where
slf.st.set(DispatcherState::Shutdown);
state.shutdown_io();
} else {
state.dsp_register_task(cx.waker());
state.register_dispatcher(cx.waker());
return Poll::Pending;
}
}
Expand Down Expand Up @@ -353,13 +355,17 @@ where

let st = self.state.clone();
let shared = self.shared.clone();
crate::rt::spawn(fut.map(move |item| shared.handle_result(item, &st)));
crate::rt::spawn(fut.map(move |item| shared.handle_result(item, st.write())));
}

fn handle_result(&self, item: Result<Option<<U as Encoder>::Item>, S::Error>) {
match self.state.write_result(item, &self.shared.codec) {
fn handle_result(
&self,
item: Result<Option<<U as Encoder>::Item>, S::Error>,
write: Write<'_>,
) {
match write.encode_result(item, &self.shared.codec) {
Ok(true) => (),
Ok(false) => self.state.enable_write_backpressure(),
Ok(false) => write.enable_backpressure(None),
Err(Either::Left(err)) => {
self.error.set(Some(err));
}
Expand All @@ -369,11 +375,16 @@ where
}
}

fn poll_service(&self, srv: &S, cx: &mut Context<'_>) -> Poll<PollService<U>> {
fn poll_service(
&self,
srv: &S,
cx: &mut Context<'_>,
read: Read<'_>,
) -> Poll<PollService<U>> {
match srv.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
// service is ready, wake io read task
self.state.dsp_restart_read_task();
read.resume();

// check keepalive timeout
self.check_keepalive();
Expand All @@ -396,7 +407,7 @@ where
PollService::ServiceError
}
}
} else if self.state.is_dsp_stopped() {
} else if self.state.is_dispatcher_stopped() {
log::trace!("dispatcher is instructed to stop");

self.unregister_keepalive();
Expand All @@ -415,7 +426,7 @@ where
// pause io read task
Poll::Pending => {
log::trace!("service is not ready, register dispatch task");
self.state.dsp_service_not_ready(cx.waker());
read.pause(cx.waker());
Poll::Pending
}
// handle service readiness error
Expand Down Expand Up @@ -590,7 +601,8 @@ mod tests {
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));

assert!(st
.write_item(Bytes::from_static(b"test"), &mut BytesCodec)
.write()
.encode(Bytes::from_static(b"test"), &mut BytesCodec)
.is_ok());
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"test"));
Expand All @@ -614,7 +626,8 @@ mod tests {
}),
);
state
.write_item(
.write()
.encode(
Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"),
&mut BytesCodec,
)
Expand Down Expand Up @@ -672,7 +685,7 @@ mod tests {
}
}),
);
state.set_write_high_watermark(16 * 1024);
state.set_buffer_sizes(8 * 1024, 16 * 1024, 1024);
crate::rt::spawn(disp.map(|_| ()));

let buf = client.read_any();
Expand All @@ -684,19 +697,19 @@ mod tests {
assert_eq!(client.remote_buffer(|buf| buf.len()), 0);

// response message
assert!(!state.is_write_ready());
assert_eq!(state.with_write_buf(|buf| buf.len()), 65536);
assert!(!state.write().is_ready());
assert_eq!(state.write().with_buf(|buf| buf.len()), 65536);

client.remote_buffer_cap(10240);
sleep(Duration::from_millis(50)).await;
assert_eq!(state.with_write_buf(|buf| buf.len()), 55296);
assert_eq!(state.write().with_buf(|buf| buf.len()), 55296);

client.remote_buffer_cap(45056);
sleep(Duration::from_millis(50)).await;
assert_eq!(state.with_write_buf(|buf| buf.len()), 10240);
assert_eq!(state.write().with_buf(|buf| buf.len()), 10240);

// backpressure disabled
assert!(state.is_write_ready());
assert!(state.write().is_ready());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1, 2]);
}

Expand Down Expand Up @@ -732,7 +745,7 @@ mod tests {
);
crate::rt::spawn(disp.keepalive_timeout(0).keepalive_timeout(1).map(|_| ()));

let state = state.disconnect_timeout(1);
state.set_disconnect_timeout(1);

let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
Expand Down
2 changes: 1 addition & 1 deletion ntex/src/framed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod write;

pub use self::dispatcher::Dispatcher;
pub use self::read::ReadTask;
pub use self::state::State;
pub use self::state::{Read, State, Write};
pub use self::time::Timer;
pub use self::write::WriteTask;

Expand Down
2 changes: 1 addition & 1 deletion ntex/src/framed/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where
log::trace!("read task is instructed to shutdown");
Poll::Ready(())
} else if self.state.is_io_stop() {
self.state.dsp_wake_task();
self.state.wake_dispatcher();
Poll::Ready(())
} else if self.state.is_read_paused() {
self.state.register_read_task(cx.waker());
Expand Down
Loading

0 comments on commit 295286d

Please sign in to comment.