Skip to content

Commit

Permalink
add ClientConnector with default implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
UkoeHB committed Oct 6, 2023
1 parent a636422 commit 030380f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 21 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ tokio-rustls = { version = "0.24.1", optional = true }
tokio-native-tls = { version = "0.3.1", optional = true }

[features]
default = ["client", "server"]
default = ["native_client", "server"]

client = ["tokio-tungstenite"]
client = ["tokio-tungstenite-wasm"]
native_client = ["client", "tokio-tungstenite"]

tungstenite_common = ["tokio-tungstenite"]

Expand Down
68 changes: 49 additions & 19 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ use crate::socket::{InMessage, MessageSignal, SocketConfig};
use crate::CloseFrame;
use crate::Error;
use crate::Message;
use crate::RawMessage;
use crate::Request;
use crate::Socket;
use async_trait::async_trait;
use base64::Engine;
use enfync::TryAdopt;
use enfync::Handle;
use http::header::HeaderName;
use http::HeaderValue;
use futures::{SinkExt, StreamExt};
use std::fmt;
use std::future::Future;
use std::time::Duration;
Expand Down Expand Up @@ -250,6 +252,34 @@ pub trait ClientExt: Send {
}
}

/// Abstract interface used by clients to connect to servers.
///
/// The connector must expose a handle representing the runtime that the client will run on. The runtime should
/// be compatible with the connection method (e.g. `tokio` for `tokio_tungstenite::connect()`,
/// `wasm_bindgen_futures::spawn_local()` for a WASM connector, etc.).
#[async_trait]
pub trait ClientConnector {
type Handle: enfync::Handle;
type Message: Into<RawMessage> + From<RawMessage> + std::fmt::Debug + Send + 'static;
type WSError: std::error::Error + Into<WSError>;
type Socket:
SinkExt<Self::Message, Error = Self::WSError> +
StreamExt<Item = Result<Self::Message, Self::WSError>> +
Unpin +
Send +
'static;
type ConnectError: std::error::Error + Send;

/// Get the connector's runtime handle.
fn handle(&self) -> Self::Handle;

/// Connect to a websocket server.
///
/// Returns `Err` if the request is invalid.
async fn connect(&self, request: Request) -> Result<Self::Socket, Self::ConnectError>;
}

/// An `ezsockets` client.
#[derive(Debug)]
pub struct Client<E: ClientExt> {
to_socket_sender: mpsc::UnboundedSender<InMessage>,
Expand Down Expand Up @@ -345,24 +375,24 @@ impl<E: ClientExt> Client<E> {
}

/// Connect to a websocket server using the default client connector.
/// - Requires a tokio runtime.
/// - Requires feature `native_client`.
/// - May only be invoked from within a tokio runtime.
#[cfg(feature = "native_client")]
pub async fn connect<E: ClientExt + 'static>(
client_fn: impl FnOnce(Client<E>) -> E,
config: ClientConfig,
) -> (Client<E>, impl Future<Output = Result<(), Error>>) {
let client_connector = enfync::builtin::native::TokioHandle::try_adopt()
.expect("ezsockets::client::connect() only works with tokio runtimes; use connect_with() instead");
let client_connector = crate::client_connector_tokio::ClientConnectorTokio::default();
let (handle, mut future) = connect_with(client_fn, config, client_connector);
let future = async move { future.extract().await.unwrap_or(Err("client actor crashed".into())) };
(handle, future)
}

/// Connect to a websocket server with the provided client connector.
/// - TODO: add ClientConnector trait (currently uses default client connector: tokio-tungstenite in tokio runtime)
pub fn connect_with<E: ClientExt + 'static>(
client_fn: impl FnOnce(Client<E>) -> E,
config: ClientConfig,
client_connector: impl enfync::Handle + Send + Sync + 'static,
client_connector: impl ClientConnector + Send + Sync + 'static,
) -> (Client<E>, enfync::PendingResult<Result<(), Error>>) {
let (to_socket_sender, mut to_socket_receiver) = mpsc::unbounded_channel();
let (client_call_sender, client_call_receiver) = mpsc::unbounded_channel();
Expand All @@ -371,13 +401,13 @@ pub fn connect_with<E: ClientExt + 'static>(
client_call_sender,
};
let mut client = client_fn(handle.clone());
let client_connector_clone = client_connector.clone();
let future = client_connector.spawn(async move {
let runtime_handle = client_connector.handle();
let future = runtime_handle.spawn(async move {
tracing::info!("connecting to {}...", config.url);
let Some(socket) = client_connect(
config.max_initial_connect_attempts,
&config,
client_connector_clone.clone(),
&client_connector,
&mut to_socket_receiver,
&mut client,
)
Expand All @@ -393,15 +423,15 @@ pub fn connect_with<E: ClientExt + 'static>(
client_call_receiver,
socket,
config,
client_connector: client_connector_clone,
client_connector,
};
actor.run().await?;
Ok(())
});
(handle, future)
}

struct ClientActor<E: ClientExt, C: enfync::Handle> {
struct ClientActor<E: ClientExt, C: ClientConnector> {
client: E,
to_socket_receiver: mpsc::UnboundedReceiver<InMessage>,
client_call_receiver: mpsc::UnboundedReceiver<E::Call>,
Expand All @@ -410,7 +440,7 @@ struct ClientActor<E: ClientExt, C: enfync::Handle> {
client_connector: C,
}

impl<E: ClientExt, C: enfync::Handle> ClientActor<E, C> {
impl<E: ClientExt, C: ClientConnector> ClientActor<E, C> {
async fn run(&mut self) -> Result<(), Error> {
loop {
tokio::select! {
Expand Down Expand Up @@ -460,7 +490,7 @@ impl<E: ClientExt, C: enfync::Handle> ClientActor<E, C> {
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
self.client_connector.clone(),
&self.client_connector,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
Expand All @@ -485,7 +515,7 @@ impl<E: ClientExt, C: enfync::Handle> ClientActor<E, C> {
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
self.client_connector.clone(),
&self.client_connector,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
Expand All @@ -507,20 +537,20 @@ impl<E: ClientExt, C: enfync::Handle> ClientActor<E, C> {
}

/// Returns Ok(Some(socket)) if connecting succeeded, Ok(None) if the client closed itself, and `Err` if an error occurred.
async fn client_connect<E: ClientExt>(
async fn client_connect<E: ClientExt, Connector: ClientConnector>(
max_attempts: usize,
config: &ClientConfig,
client_connector: impl enfync::Handle,
client_connector: &Connector,
to_socket_receiver: &mut mpsc::UnboundedReceiver<InMessage>,
client: &mut E,
) -> Result<Option<Socket>, Error> {
for i in 1.. {
// connection attempt
tracing::info!("connecting attempt no: {}...", i);
let connect_http_request = config.connect_http_request();
let result = tokio_tungstenite::connect_async(connect_http_request).await; //todo: ClientConnector::connect()
let result = client_connector.connect(connect_http_request).await;
match result {
Ok((socket_impl, _)) => {
Ok(socket_impl) => {
tracing::info!("successfully connected");
if let Err(err) = client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}, closing client", err);
Expand All @@ -529,7 +559,7 @@ async fn client_connect<E: ClientExt>(
let socket = Socket::new(
socket_impl,
config.socket_config.clone().unwrap_or_default(),
client_connector,
client_connector.handle(),
);
return Ok(Some(socket));
}
Expand Down
46 changes: 46 additions & 0 deletions src/client_connectors/client_connector_tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::client::ClientConnector;
use crate::Request;
use enfync::TryAdopt;
use tokio_tungstenite::tungstenite;

/// Implementation of [`ClientConnector`] for tokio runtimes.
#[derive(Clone)]
pub struct ClientConnectorTokio {
handle: enfync::builtin::native::TokioHandle,
}

impl ClientConnectorTokio {
pub fn new(handle: tokio::runtime::Handle) -> Self {
Self { handle: handle.into() }
}
}

impl Default for ClientConnectorTokio {
fn default() -> Self {
let handle = enfync::builtin::native::TokioHandle::try_adopt()
.expect("ClientConnectorTokio::default() only works inside a tokio runtime; use ClientConnectorTokionew() instead");
Self { handle }
}
}

#[async_trait::async_trait]
impl ClientConnector for ClientConnectorTokio {
type Handle = enfync::builtin::native::TokioHandle;
type Message = tungstenite::Message;
type WSError = tungstenite::error::Error;
type Socket = tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
type ConnectError = tungstenite::error::Error;

/// Get the connector's runtime handle.
fn handle(&self) -> Self::Handle {
self.handle.clone()
}

/// Connect to a websocket server.
///
/// Returns `Err` if the request is invalid.
async fn connect(&self, request: Request) -> Result<Self::Socket, Self::ConnectError> {
let (socket, _) = tokio_tungstenite::connect_async(request).await?;
Ok(socket)
}
}
6 changes: 6 additions & 0 deletions src/client_connectors/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

#[cfg(feature = "native_client")]
pub mod client_connector_tokio;

//#[cfg(feature = "wasm_client")]
//pub mod client_connector_wasm;
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
//!
//! Refer to [`client`] or [`server`] module for detailed implementation guides.
mod client_connectors;
mod server_runners;
mod socket;

pub use client_connectors::*;
pub use server_runners::*;

pub use socket::CloseCode;
Expand Down

0 comments on commit 030380f

Please sign in to comment.