Skip to content

Commit

Permalink
Shutdown service on error and on worker shutdown (#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jun 27, 2024
1 parent e3dd4b3 commit 192bdc8
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 85 deletions.
4 changes: 4 additions & 0 deletions ntex-server/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.1.0] - 2024-06-27

* Shutdown service on error and on worker shutdown

## [2.0.0] - 2024-05-28

* Use async fn for Service::ready() and Service::shutdown()
Expand Down
8 changes: 4 additions & 4 deletions ntex-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-server"
version = "2.0.0"
version = "2.1.0"
authors = ["ntex contributors <[email protected]>"]
description = "Server for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -17,10 +17,10 @@ path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1"
ntex-net = "2.0"
ntex-service = "3.0"
ntex-net = "2"
ntex-service = "3"
ntex-rt = "0.4"
ntex-util = "2.0"
ntex-util = "2"

async-channel = "2"
async-broadcast = "0.7"
Expand Down
21 changes: 3 additions & 18 deletions ntex-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#![deny(rust_2018_idioms, unreachable_pub)]
#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
#![allow(clippy::let_underscore_future)]

use ntex_service::ServiceFactory;
use ntex_util::time::Millis;

mod manager;
pub mod net;
Expand All @@ -13,10 +12,8 @@ mod wrk;

pub use self::pool::WorkerPool;
pub use self::server::Server;
pub use self::wrk::{Worker, WorkerStatus, WorkerStop};

#[doc(hidden)]
pub use self::signals::{signal, Signal};
pub use self::wrk::{Worker, WorkerStatus, WorkerStop};

/// Worker id
#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
Expand All @@ -30,23 +27,11 @@ impl WorkerId {
}
}

#[non_exhaustive]
#[derive(Debug)]
/// Worker message
pub enum WorkerMessage<T> {
/// New item received
New(T),
/// Graceful shutdown in millis
Shutdown(Millis),
/// Force shutdown
ForceShutdown,
}

#[allow(async_fn_in_trait)]
/// Worker service factory.
pub trait ServerConfiguration: Send + Clone + 'static {
type Item: Send + 'static;
type Factory: ServiceFactory<WorkerMessage<Self::Item>> + 'static;
type Factory: ServiceFactory<Self::Item> + 'static;

/// Create service factory for handling `WorkerMessage<T>` messages.
async fn create(&self) -> Result<Self::Factory, ()>;
Expand Down
23 changes: 22 additions & 1 deletion ntex-server/src/net/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use super::config::{Config, ServiceConfig};
use super::factory::{self, FactoryServiceType, OnWorkerStart, OnWorkerStartWrapper};
use super::{socket::Listener, Connection, ServerStatus, StreamServer, Token};

/// Server builder
/// Streaming service builder
///
/// This type can be used to construct an instance of `net streaming server` through a
/// builder-like pattern.
pub struct ServerBuilder {
token: Token,
backlog: i32,
Expand All @@ -30,6 +33,18 @@ impl Default for ServerBuilder {
}
}

impl fmt::Debug for ServerBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ServerBuilder")
.field("token", &self.token)
.field("backlog", &self.backlog)
.field("sockets", &self.sockets)
.field("accept", &self.accept)
.field("worker-pool", &self.pool)
.finish()
}
}

impl ServerBuilder {
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
Expand Down Expand Up @@ -383,4 +398,10 @@ mod tests {
let addrs: Vec<net::SocketAddr> = Vec::new();
assert!(bind_addr(&addrs[..], 10).is_err());
}

#[test]
fn test_debug() {
let builder = ServerBuilder::default();
assert!(format!("{:?}", builder).contains("ServerBuilder"));
}
}
13 changes: 12 additions & 1 deletion ntex-server/src/net/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ impl Config {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ServiceConfig(pub(super) Rc<RefCell<ServiceConfigInner>>);

#[derive(Debug)]
struct Socket {
name: String,
sockets: Vec<(Token, Listener, &'static str)>,
Expand All @@ -55,6 +56,16 @@ pub(super) struct ServiceConfigInner {
backlog: i32,
}

impl fmt::Debug for ServiceConfigInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ServiceConfigInner")
.field("token", &self.token)
.field("backlog", &self.backlog)
.field("sockets", &self.sockets)
.finish()
}
}

impl ServiceConfig {
pub(super) fn new(token: Token, backlog: i32) -> Self {
ServiceConfig(Rc::new(RefCell::new(ServiceConfigInner {
Expand Down
2 changes: 2 additions & 0 deletions ntex-server/src/net/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll};

use ntex_util::task::LocalWaker;

#[derive(Debug)]
/// Simple counter with ability to notify task on reaching specific number
///
/// Counter could be cloned, total count is shared across all clones.
pub(super) struct Counter(Rc<CounterInner>);

#[derive(Debug)]
struct CounterInner {
count: Cell<usize>,
capacity: usize,
Expand Down
1 change: 1 addition & 0 deletions ntex-server/src/net/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::{Config, Token};
pub(super) type BoxServerService = boxed::BoxServiceFactory<(), Io, (), (), ()>;
pub(crate) type FactoryServiceType = Box<dyn FactoryService>;

#[derive(Debug)]
pub(crate) struct NetService {
pub(crate) tokens: Vec<(Token, &'static str)>,
pub(crate) factory: BoxServerService,
Expand Down
2 changes: 1 addition & 1 deletion ntex-server/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod test;
pub use self::accept::{AcceptLoop, AcceptNotify, AcceptorCommand};
pub use self::builder::{bind_addr, create_tcp_listener, ServerBuilder};
pub use self::config::{Config, ServiceConfig, ServiceRuntime};
pub use self::service::{ServerMessage, StreamServer};
pub use self::service::StreamServer;
pub use self::socket::{Connection, Stream};
pub use self::test::{build_test_server, test_server, TestServer};

Expand Down
62 changes: 33 additions & 29 deletions ntex-server/src/net/service.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use std::fmt;

use ntex_bytes::{Pool, PoolRef};
use ntex_net::Io;
use ntex_service::{boxed, Service, ServiceCtx, ServiceFactory};
use ntex_util::HashMap;
use ntex_util::{future::join_all, HashMap};

use crate::{ServerConfiguration, WorkerMessage};
use crate::ServerConfiguration;

use super::accept::{AcceptNotify, AcceptorCommand};
use super::counter::Counter;
use super::factory::{FactoryServiceType, NetService, OnWorkerStart};
use super::{socket::Connection, Token, MAX_CONNS_COUNTER};

pub type ServerMessage = WorkerMessage<Connection>;

pub(super) type BoxService = boxed::BoxService<Io, (), ()>;

/// Net streaming server
pub struct StreamServer {
notify: AcceptNotify,
services: Vec<FactoryServiceType>,
Expand All @@ -34,6 +35,14 @@ impl StreamServer {
}
}

impl fmt::Debug for StreamServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamServer")
.field("services", &self.services.len())
.finish()
}
}

/// Worker service factory.
impl ServerConfiguration for StreamServer {
type Item = Connection;
Expand Down Expand Up @@ -88,11 +97,12 @@ impl Clone for StreamServer {
}
}

#[derive(Debug)]
pub struct StreamService {
services: Vec<NetService>,
}

impl ServiceFactory<ServerMessage> for StreamService {
impl ServiceFactory<Connection> for StreamService {
type Response = ();
type Error = ();
type Service = StreamServiceImpl;
Expand Down Expand Up @@ -130,13 +140,14 @@ impl ServiceFactory<ServerMessage> for StreamService {
}
}

#[derive(Debug)]
pub struct StreamServiceImpl {
tokens: HashMap<Token, (usize, &'static str, Pool, PoolRef)>,
services: Vec<BoxService>,
conns: Counter,
}

impl Service<ServerMessage> for StreamServiceImpl {
impl Service<Connection> for StreamServiceImpl {
type Response = ();
type Error = ();

Expand All @@ -161,35 +172,28 @@ impl Service<ServerMessage> for StreamServiceImpl {
}

async fn shutdown(&self) {
for svc in &self.services {
svc.shutdown().await;
}
let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await;
log::info!(
"Worker service shutdown, {} connections",
super::num_connections()
);
}

async fn call(&self, req: ServerMessage, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
match req {
ServerMessage::New(con) => {
if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) {
let stream: Io<_> = con.io.try_into().map_err(|e| {
log::error!("Cannot convert to an async io stream: {}", e);
})?;

stream.set_tag(tag);
stream.set_memory_pool(*pool);
let guard = self.conns.get();
let _ = ctx.call(&self.services[*idx], stream).await;
drop(guard);
Ok(())
} else {
log::error!("Cannot get handler service for connection: {:?}", con);
Err(())
}
}
_ => Ok(()),
async fn call(&self, con: Connection, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {
if let Some((idx, tag, _, pool)) = self.tokens.get(&con.token) {
let stream: Io<_> = con.io.try_into().map_err(|e| {
log::error!("Cannot convert to an async io stream: {}", e);
})?;

stream.set_tag(tag);
stream.set_memory_pool(*pool);
let guard = self.conns.get();
let _ = ctx.call(&self.services[*idx], stream).await;
drop(guard);
Ok(())
} else {
log::error!("Cannot get handler service for connection: {:?}", con);
Err(())
}
}
}
Loading

0 comments on commit 192bdc8

Please sign in to comment.