Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClientConnector abstraction for cross-platform clients #90

Merged
merged 4 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Move `axum` and `tungstenite` server runners into new submodule `src/server_runners`.
- Update to `tokio-tungstenite` v0.20.0.
- Fork [axum-tungstenite](https://crates.io/crates/axum-tungstenite) crate into `src/server_runners` and refactor the `axum` runner to use that instead of `axum::extract::ws`.
- Bug fix: remove race condition between sending a message and a socket connection closing that would cause a client to shut down instead of calling `on_disconnect/on_close`.
- Use [`tokio-tungstenite-wasm`](https://github.com/TannerRogalsky/tokio-tungstenite-wasm) errors internally to better support cross-platform clients.
- Use [`enfync`](https://github.com/UkoeHB/enfync) runtime handles internally to better support cross-platform clients. Default clients continue to use tokio.
- Add `ClientConnector` abstraction for connecting clients and add `ezsockets::client::connect_with`.


Migration guide:
Expand Down
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ categories = ["asynchronous", "network-programming", "web-programming::websocket
async-trait = "0.1.52"
atomic_enum = "0.2.0"
base64 = "0.21.0"
enfync = "0.1.0"
futures = "0.3.21"
http = "0.2.8"
tokio = { version = "1.17.0", features = ["sync", "rt", "macros", "time"] }
Expand All @@ -31,18 +32,21 @@ http-body = { version = "0.4.5", optional = true }
hyper = { version = "0.14.23", optional = true }
sha-1 = { version = "0.10.1", optional = true }

tokio-tungstenite-wasm = { version = "0.2.0", optional = true }

tokio-tungstenite = { version = "0.20.0", optional = true }
tokio-rustls = { version = "0.24.1", optional = true }
tokio-native-tls = { version = "0.3.1", optional = true }

[features]
default = ["client", "server"]
default = ["native_client", "server"]

client = ["tokio-tungstenite"]
client = ["tokio-tungstenite-wasm"]
native_client = ["client", "tokio-tungstenite"]

tungstenite_common = ["tokio-tungstenite"]

server = ["tungstenite_common"]
server = ["tungstenite_common", "tokio-tungstenite-wasm"]
tungstenite = ["server"]
axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util", "http-body", "hyper", "sha-1"]

Expand Down
85 changes: 75 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ use crate::socket::{InMessage, MessageSignal, SocketConfig};
use crate::CloseFrame;
use crate::Error;
use crate::Message;
use crate::RawMessage;
use crate::Request;
use crate::Socket;
use async_trait::async_trait;
use base64::Engine;
use enfync::Handle;
use futures::{SinkExt, StreamExt};
use http::header::HeaderName;
use http::HeaderValue;
use std::fmt;
Expand All @@ -64,7 +67,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_tungstenite::tungstenite;
use tokio_tungstenite::tungstenite::error::Error as WSError;
use tokio_tungstenite_wasm::Error as WSError;
use url::Url;

pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::new(5, 0);
Expand Down Expand Up @@ -249,6 +252,33 @@ pub trait ClientExt: Send {
}
}

/// Abstract interface used by clients to connect to servers.
///
/// The connector must expose a handle representing the runtime that the client will run on. The runtime should
/// be compatible with the connection method (e.g. `tokio` for `tokio_tungstenite::connect()`,
/// `wasm_bindgen_futures::spawn_local()` for a WASM connector, etc.).
#[async_trait]
pub trait ClientConnector {
type Handle: enfync::Handle;
type Message: Into<RawMessage> + From<RawMessage> + std::fmt::Debug + Send + 'static;
type WSError: std::error::Error + Into<WSError>;
type Socket: SinkExt<Self::Message, Error = Self::WSError>
+ StreamExt<Item = Result<Self::Message, Self::WSError>>
+ Unpin
+ Send
+ 'static;
type ConnectError: std::error::Error + Send;

/// Get the connector's runtime handle.
fn handle(&self) -> Self::Handle;

/// Connect to a websocket server.
///
/// Returns `Err` if the request is invalid.
async fn connect(&self, request: Request) -> Result<Self::Socket, Self::ConnectError>;
}

/// An `ezsockets` client.
#[derive(Debug)]
pub struct Client<E: ClientExt> {
to_socket_sender: mpsc::UnboundedSender<InMessage>,
Expand Down Expand Up @@ -343,22 +373,45 @@ impl<E: ClientExt> Client<E> {
}
}

/// Connect to a websocket server using the default client connector.
/// - Requires feature `native_client`.
/// - May only be invoked from within a tokio runtime.
#[cfg(feature = "native_client")]
pub async fn connect<E: ClientExt + 'static>(
client_fn: impl FnOnce(Client<E>) -> E,
config: ClientConfig,
) -> (Client<E>, impl Future<Output = Result<(), Error>>) {
let client_connector = crate::client_connector_tokio::ClientConnectorTokio::default();
let (handle, mut future) = connect_with(client_fn, config, client_connector);
let future = async move {
future
.extract()
.await
.unwrap_or(Err("client actor crashed".into()))
};
(handle, future)
}

/// Connect to a websocket server with the provided client connector.
pub fn connect_with<E: ClientExt + 'static>(
client_fn: impl FnOnce(Client<E>) -> E,
config: ClientConfig,
client_connector: impl ClientConnector + Send + Sync + 'static,
) -> (Client<E>, enfync::PendingResult<Result<(), Error>>) {
let (to_socket_sender, mut to_socket_receiver) = mpsc::unbounded_channel();
let (client_call_sender, client_call_receiver) = mpsc::unbounded_channel();
let handle = Client {
to_socket_sender,
client_call_sender,
};
let mut client = client_fn(handle.clone());
let future = tokio::spawn(async move {
let runtime_handle = client_connector.handle();
let future = runtime_handle.spawn(async move {
tracing::info!("connecting to {}...", config.url);
let Some(socket) = client_connect(
config.max_initial_connect_attempts,
&config,
&client_connector,
&mut to_socket_receiver,
&mut client,
)
Expand All @@ -374,30 +427,35 @@ pub async fn connect<E: ClientExt + 'static>(
client_call_receiver,
socket,
config,
client_connector,
};
actor.run().await?;
Ok(())
});
let future = async move { future.await.unwrap_or(Err("client actor crashed".into())) };
(handle, future)
}

struct ClientActor<E: ClientExt> {
struct ClientActor<E: ClientExt, C: ClientConnector> {
client: E,
to_socket_receiver: mpsc::UnboundedReceiver<InMessage>,
client_call_receiver: mpsc::UnboundedReceiver<E::Call>,
socket: Socket,
config: ClientConfig,
client_connector: C,
}

impl<E: ClientExt> ClientActor<E> {
impl<E: ClientExt, C: ClientConnector> ClientActor<E, C> {
async fn run(&mut self) -> Result<(), Error> {
loop {
tokio::select! {
Some(inmessage) = self.to_socket_receiver.recv() => {
let closed_self = matches!(inmessage.message, Some(Message::Close(_)));
if self.socket.send(inmessage).await.is_err() {
match self.socket.await_sink_close().await {
let result = self.socket.await_sink_close().await;
if let Err(err) = &result {
tracing::warn!(?err, "encountered sink closing error when trying to send a message");
}
match result {
Err(WSError::ConnectionClosed) |
Err(WSError::AlreadyClosed) |
Err(WSError::Io(_)) |
Expand Down Expand Up @@ -436,6 +494,7 @@ impl<E: ClientExt> ClientActor<E> {
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
&self.client_connector,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
Expand All @@ -460,6 +519,7 @@ impl<E: ClientExt> ClientActor<E> {
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
&self.client_connector,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
Expand All @@ -481,25 +541,30 @@ impl<E: ClientExt> ClientActor<E> {
}

/// Returns Ok(Some(socket)) if connecting succeeded, Ok(None) if the client closed itself, and `Err` if an error occurred.
async fn client_connect<E: ClientExt>(
async fn client_connect<E: ClientExt, Connector: ClientConnector>(
max_attempts: usize,
config: &ClientConfig,
client_connector: &Connector,
to_socket_receiver: &mut mpsc::UnboundedReceiver<InMessage>,
client: &mut E,
) -> Result<Option<Socket>, Error> {
for i in 1.. {
// connection attempt
tracing::info!("connecting attempt no: {}...", i);
let connect_http_request = config.connect_http_request();
let result = tokio_tungstenite::connect_async(connect_http_request).await;
let result = client_connector.connect(connect_http_request).await;
match result {
Ok((socket, _)) => {
Ok(socket_impl) => {
tracing::info!("successfully connected");
if let Err(err) = client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}, closing client", err);
return Err(err);
}
let socket = Socket::new(socket, config.socket_config.clone().unwrap_or_default());
let socket = Socket::new(
socket_impl,
config.socket_config.clone().unwrap_or_default(),
client_connector.handle(),
);
return Ok(Some(socket));
}
Err(err) => {
Expand Down
52 changes: 52 additions & 0 deletions src/client_connectors/client_connector_tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::client::ClientConnector;
use crate::Request;
use enfync::TryAdopt;
use tokio_tungstenite::tungstenite;

/// Implementation of [`ClientConnector`] for tokio runtimes.
#[derive(Clone)]
pub struct ClientConnectorTokio {
handle: enfync::builtin::native::TokioHandle,
}

impl ClientConnectorTokio {
pub fn new(handle: tokio::runtime::Handle) -> Self {
Self {
handle: handle.into(),
}
}
}

impl Default for ClientConnectorTokio {
fn default() -> Self {
let handle = enfync::builtin::native::TokioHandle::try_adopt()
.expect(
"ClientConnectorTokio::default() only works inside a tokio runtime; use ClientConnectorTokio::new() instead"
);
Self { handle }
}
}

#[async_trait::async_trait]
impl ClientConnector for ClientConnectorTokio {
type Handle = enfync::builtin::native::TokioHandle;
type Message = tungstenite::Message;
type WSError = tungstenite::error::Error;
type Socket = tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;
type ConnectError = tungstenite::error::Error;

/// Get the connector's runtime handle.
fn handle(&self) -> Self::Handle {
self.handle.clone()
}

/// Connect to a websocket server.
///
/// Returns `Err` if the request is invalid.
async fn connect(&self, request: Request) -> Result<Self::Socket, Self::ConnectError> {
let (socket, _) = tokio_tungstenite::connect_async(request).await?;
Ok(socket)
}
}
5 changes: 5 additions & 0 deletions src/client_connectors/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#[cfg(feature = "native_client")]
pub mod client_connector_tokio;

//#[cfg(feature = "wasm_client")]
//pub mod client_connector_wasm;
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
//!
//! Refer to [`client`] or [`server`] module for detailed implementation guides.

mod client_connectors;
mod server_runners;
mod socket;

pub use client_connectors::*;
pub use server_runners::*;

pub use socket::CloseCode;
Expand Down
5 changes: 4 additions & 1 deletion src/server_runners/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use async_trait::async_trait;
use axum::extract::ConnectInfo;
use axum::extract::FromRequest;
use axum::response::Response;
use enfync::TryAdopt;
use std::net::SocketAddr;

/// Extractor for establishing WebSocket connections.
Expand Down Expand Up @@ -151,7 +152,9 @@ impl Upgrade {
socket_config: SocketConfig,
) -> Response {
self.ws.on_upgrade(move |socket| async move {
let socket = Socket::new(socket, socket_config);
let handle = enfync::builtin::native::TokioHandle::try_adopt()
.expect("axum server runner only works in a tokio runtime");
let socket = Socket::new(socket, socket_config, handle);
server.accept(socket, self.request, self.address);
})
}
Expand Down
Loading
Loading