Skip to content

Commit

Permalink
H1 dispatcher fixes (#35)
Browse files Browse the repository at this point in the history
* Flush and close io after ws handler exit

* Deprecate ntex::util::order
  • Loading branch information
fafhrd91 authored Jan 13, 2021
1 parent 6e14c9f commit 263691b
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 38 deletions.
6 changes: 6 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## [0.1.28] - 2021-01-14

* Flush and close io after ws handler exit

* Deprecate ntex::util::order

## [0.1.27] - 2021-01-13

* Use ahash instead of fxhash
Expand Down
4 changes: 2 additions & 2 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.1.27"
version = "0.1.28"
authors = ["ntex contributors <[email protected]>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down Expand Up @@ -48,7 +48,7 @@ base64 = "0.13"
bitflags = "1.2.1"
bytes = "0.5.6"
bytestring = "0.1.5"
derive_more = "0.99.5"
derive_more = "0.99.11"
either = "1.5.3"
encoding_rs = "0.8.26"
futures = "0.3.9"
Expand Down
13 changes: 2 additions & 11 deletions ntex/src/channel/condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::cell::Cell;
use crate::task::LocalWaker;

/// Condition allows to notify multiple waiters at the same time
#[derive(Clone)]
pub struct Condition(Cell<Inner>);

struct Inner {
Expand Down Expand Up @@ -86,17 +87,7 @@ impl Future for Waiter {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

let inner = unsafe { this.inner.get_mut().data.get_unchecked_mut(this.token) };
if inner.is_none() {
let waker = LocalWaker::default();
waker.register(cx.waker());
*inner = Some(waker);
} else if !inner.as_mut().unwrap().register(cx.waker()) {
return Poll::Ready(());
}
Poll::Pending
self.get_mut().poll_waiter(cx)
}
}

Expand Down
13 changes: 0 additions & 13 deletions ntex/src/channel/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,6 @@ pub use futures::channel::oneshot::Canceled;
use super::cell::Cell;
use crate::task::LocalWaker;

#[doc(hidden)]
#[deprecated(since = "0.1.16", note = "Use pool::create instead")]
pub use super::pool::new as pool;
#[doc(hidden)]
#[deprecated(since = "0.1.16", note = "Use pool::Pool instead")]
pub use super::pool::Pool;
#[doc(hidden)]
#[deprecated(since = "0.1.16", note = "Use pool::Receiver instead")]
pub use super::pool::Receiver as PReceiver;
#[doc(hidden)]
#[deprecated(since = "0.1.16", note = "Use pool::Sender instead")]
pub use super::pool::Sender as PSender;

/// Creates a new futures-aware, one-shot channel.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Cell::new(Inner {
Expand Down
1 change: 1 addition & 0 deletions ntex/src/channel/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct Inner<T> {
}

impl<T> Pool<T> {
/// Create a new one-shot channel.
pub fn channel(&self) -> (Sender<T>, Receiver<T>) {
let token = self.0.get_mut().insert(Inner {
flags: Flags::all(),
Expand Down
15 changes: 11 additions & 4 deletions ntex/src/http/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,17 @@ where

// handle upgrade request
if this.inner.flags.contains(Flags::UPGRADE) {
return this.upgrade.as_pin_mut().unwrap().poll(cx).map_err(|e| {
error!("Upgrade handler error: {}", e);
DispatchError::Upgrade
});
let result =
ready!(this.upgrade.as_pin_mut().unwrap().poll(cx)).map_err(|e| {
error!("Upgrade handler error: {}", e);
DispatchError::Upgrade
});
if result.is_err() {
return Poll::Ready(result);
}
this.inner.flags.remove(Flags::UPGRADE);
this.inner.flags.insert(Flags::SHUTDOWN);
return this.inner.poll_shutdown(cx);
}

// shutdown process
Expand Down
4 changes: 2 additions & 2 deletions ntex/src/server/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ pub fn test_server<F: StreamServiceFactory<TcpStream>>(factory: F) -> TestServer
}

/// Start new server with server builder
pub fn build_test_server<F>(mut factory: F) -> TestServer
pub fn build_test_server<F>(factory: F) -> TestServer
where
F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static,
F: FnOnce(ServerBuilder) -> ServerBuilder + Send + 'static,
{
let (tx, rx) = mpsc::channel();

Expand Down
7 changes: 2 additions & 5 deletions ntex/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! A synchronization primitive for task wakeup.
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::task::Waker;
use std::{fmt, rc};
use std::{cell::UnsafeCell, fmt, marker::PhantomData, rc, task::Waker};

/// A synchronization primitive for task wakeup.
///
Expand All @@ -18,7 +15,7 @@ use std::{fmt, rc};
/// differs from the usual `thread::park` pattern). It is also permitted for
/// `wake` to be called **before** `register`. This results in a no-op.
///
/// A single `AtomicWaker` may be reused for any number of calls to `register` or
/// A single `LocalWaker` may be reused for any number of calls to `register` or
/// `wake`.
#[derive(Default)]
pub struct LocalWaker {
Expand Down
5 changes: 4 additions & 1 deletion ntex/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ mod extensions;
pub mod framed;
pub mod inflight;
pub mod keepalive;
pub mod order;
pub mod stream;
pub mod time;
pub mod timeout;
pub mod variant;

pub use self::either::either;
pub use self::extensions::Extensions;

#[doc(hidden)]
#[deprecated(since = "0.1.27")]
pub mod order;

0 comments on commit 263691b

Please sign in to comment.