Skip to content

Commit

Permalink
expose server status change notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jun 3, 2021
1 parent 3e480c0 commit f4006c7
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 8 deletions.
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.18] - 2021-06-03

* server: expose server status change notifications

## [0.3.17] - 2021-05-24

* framed: add read/write bytes pool
Expand Down
8 changes: 4 additions & 4 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.3.17"
version = "0.3.18"
authors = ["ntex contributors <[email protected]>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down Expand Up @@ -45,19 +45,19 @@ http-framework = ["h2", "http", "httparse",
[dependencies]
ntex-codec = "0.4.1"
ntex-rt = "0.2.2"
ntex-router = "0.4.2"
ntex-router = "0.4.3"
ntex-service = "0.1.9"
ntex-macros = "0.1.3"
ntex-util = "0.1.1"

ahash = "0.7.4"
base64 = "0.13"
bitflags = "1.2"
bytes = "1.0"
bytestring = { version = "1.0", features = ["serde"] }
derive_more = "0.99.13"
futures-core = { version = "0.3.15", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.15", default-features = false, features = ["alloc"] }
ahash = "0.7.3"
log = "0.4"
mio = "0.7.10"
num_cpus = "1.13"
Expand Down Expand Up @@ -104,4 +104,4 @@ time = "0.2"
open-ssl = { version="0.10", package = "openssl" }
rust-tls = { version = "0.19", package="rustls", features = ["dangerous_configuration"] }
webpki = "0.21"
futures = "0.3.13"
futures = "0.3.15"
43 changes: 40 additions & 3 deletions ntex/src/server/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::rt::System;

use super::socket::{Listener, SocketAddr};
use super::worker::{Connection, WorkerClient};
use super::{Server, Token};
use super::{Server, ServerStatus, Token};

const DELTA: usize = 100;
const NOTIFY: mio::Token = mio::Token(0);
Expand Down Expand Up @@ -49,6 +49,7 @@ impl AcceptNotify {
pub(super) struct AcceptLoop {
notify: AcceptNotify,
inner: Option<(sync_mpsc::Receiver<Command>, mio::Poll, Server)>,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
}

impl AcceptLoop {
Expand All @@ -69,6 +70,7 @@ impl AcceptLoop {
AcceptLoop {
notify,
inner: Some((rx, poll, srv)),
status_handler: None,
}
}

Expand All @@ -80,6 +82,13 @@ impl AcceptLoop {
self.notify.clone()
}

pub(super) fn set_status_handler<F>(&mut self, f: F)
where
F: FnMut(ServerStatus) + Send + 'static,
{
self.status_handler = Some(Box::new(f));
}

pub(super) fn start(
&mut self,
socks: Vec<(Token, Listener)>,
Expand All @@ -89,8 +98,17 @@ impl AcceptLoop {
.inner
.take()
.expect("AcceptLoop cannot be used multiple times");
let status_handler = self.status_handler.take();

Accept::start(rx, poll, socks, srv, workers, self.notify.clone());
Accept::start(
rx,
poll,
socks,
srv,
workers,
self.notify.clone(),
status_handler,
);
}
}

Expand All @@ -103,6 +121,7 @@ struct Accept {
notify: AcceptNotify,
next: usize,
backpressure: bool,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
}

/// This function defines errors that are per-connection. Which basically
Expand All @@ -126,6 +145,7 @@ impl Accept {
srv: Server,
workers: Vec<WorkerClient>,
notify: AcceptNotify,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
) {
let sys = System::current();

Expand All @@ -134,7 +154,7 @@ impl Accept {
.name("ntex-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
Accept::new(rx, poll, socks, workers, srv, notify).poll()
Accept::new(rx, poll, socks, workers, srv, notify, status_handler).poll()
});
}

Expand All @@ -145,6 +165,7 @@ impl Accept {
workers: Vec<WorkerClient>,
srv: Server,
notify: AcceptNotify,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
) -> Accept {
// Start accept
let mut sockets = Slab::new();
Expand Down Expand Up @@ -177,11 +198,18 @@ impl Accept {
workers,
notify,
srv,
status_handler,
next: 0,
backpressure: false,
}
}

fn update_status(&mut self, st: ServerStatus) {
if let Some(ref mut hnd) = self.status_handler {
(&mut *hnd)(st)
}
}

fn poll(&mut self) {
trace!("Starting server accept loop");

Expand Down Expand Up @@ -258,6 +286,7 @@ impl Accept {
info!("Paused accepting connections on {}", info.addr);
}
}
self.update_status(ServerStatus::NotReady);
}
Command::Resume => {
for (token, info) in self.sockets.iter_mut() {
Expand All @@ -274,12 +303,14 @@ impl Accept {
);
}
}
self.update_status(ServerStatus::Ready);
}
Command::Stop => {
for (_, info) in self.sockets.iter_mut() {
trace!("Stopping socket listener: {}", info.addr);
let _ = self.poll.registry().deregister(&mut info.sock);
}
self.update_status(ServerStatus::NotReady);
return false;
}
Command::Worker(worker) => {
Expand Down Expand Up @@ -308,6 +339,12 @@ impl Accept {
}

fn backpressure(&mut self, on: bool) {
self.update_status(if on {
ServerStatus::NotReady
} else {
ServerStatus::Ready
});

if self.backpressure {
if !on {
self.backpressure = false;
Expand Down
14 changes: 13 additions & 1 deletion ntex/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::service::{Factory, InternalServiceFactory, StreamServiceFactory};
use super::signals::{Signal, Signals};
use super::socket::Listener;
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::{Server, ServerCommand, Token};
use super::{Server, ServerCommand, ServerStatus, Token};

const STOP_DELAY: Duration = Duration::from_millis(300);

Expand Down Expand Up @@ -128,6 +128,18 @@ impl ServerBuilder {
self
}

#[doc(hidden)]
/// Set server status handler.
///
/// Server calls this handler on every inner status update.
pub fn status_handler<F>(mut self, handler: F) -> Self
where
F: FnMut(ServerStatus) + Send + 'static,
{
self.accept.set_status_handler(handler);
self
}

/// Execute external configuration as part of the server building
/// process.
///
Expand Down
6 changes: 6 additions & 0 deletions ntex/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub use self::test::{build_test_server, test_server, TestServer};
#[doc(hidden)]
pub use self::socket::FromStream;

/// Server readiness status
pub enum ServerStatus {
Ready,
NotReady,
}

/// Socket id token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(self) struct Token(usize);
Expand Down

0 comments on commit f4006c7

Please sign in to comment.