Skip to content

Commit

Permalink
update changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
UkoeHB committed Oct 6, 2023
1 parent 030380f commit ae604d6
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Move `axum` and `tungstenite` server runners into new submodule `src/server_runners`.
- Update to `tokio-tungstenite` v0.20.0.
- Fork [axum-tungstenite](https://crates.io/crates/axum-tungstenite) crate into `src/server_runners` and refactor the `axum` runner to use that instead of `axum::extract::ws`.
- Bug fix: remove race condition between sending a message and a socket connection closing that would cause a client to shut down instead of calling `on_disconnect/on_close`.
- Use [`tokio-tungstenite-wasm`](https://github.com/TannerRogalsky/tokio-tungstenite-wasm) errors internally to better support cross-platform clients.
- Use [`enfync`](https://github.com/UkoeHB/enfync) runtime handles internally to better support cross-platform clients. Default clients continue to use tokio.
- Add `ClientConnector` abstraction for connecting clients and add `ezsockets::client::connect_with`.


Migration guide:
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ categories = ["asynchronous", "network-programming", "web-programming::websocket
async-trait = "0.1.52"
atomic_enum = "0.2.0"
base64 = "0.21.0"
enfync = { git = "https://github.com/UkoeHB/enfync", rev = "010e30f" }
enfync = "0.1.0"
futures = "0.3.21"
http = "0.2.8"
tokio = { version = "1.17.0", features = ["sync", "rt", "macros", "time"] }
Expand Down Expand Up @@ -51,9 +51,9 @@ server = ["tungstenite_common", "tokio-tungstenite-wasm"]
tungstenite = ["server"]
axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util", "http-body", "hyper", "sha-1"]

tls = ["server"]
native-tls = ["tls", "tokio-native-tls", "tokio-tungstenite/native-tls" ]
rustls = ["tls", "tokio-rustls", "tokio-tungstenite/rustls-tls-webpki-roots" ]
tls = []
native-tls = ["tls", "tokio-native-tls", "tokio-tungstenite/native-tls"]
rustls = ["tls", "tokio-rustls", "tokio-tungstenite/rustls-tls-webpki-roots"]

[dev-dependencies]
tokio = { version = "1.17.0", features = ["full"] }
Expand Down
20 changes: 12 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ use crate::Socket;
use async_trait::async_trait;
use base64::Engine;
use enfync::Handle;
use futures::{SinkExt, StreamExt};
use http::header::HeaderName;
use http::HeaderValue;
use futures::{SinkExt, StreamExt};
use std::fmt;
use std::future::Future;
use std::time::Duration;
Expand Down Expand Up @@ -262,12 +262,11 @@ pub trait ClientConnector {
type Handle: enfync::Handle;
type Message: Into<RawMessage> + From<RawMessage> + std::fmt::Debug + Send + 'static;
type WSError: std::error::Error + Into<WSError>;
type Socket:
SinkExt<Self::Message, Error = Self::WSError> +
StreamExt<Item = Result<Self::Message, Self::WSError>> +
Unpin +
Send +
'static;
type Socket: SinkExt<Self::Message, Error = Self::WSError>
+ StreamExt<Item = Result<Self::Message, Self::WSError>>
+ Unpin
+ Send
+ 'static;
type ConnectError: std::error::Error + Send;

/// Get the connector's runtime handle.
Expand Down Expand Up @@ -384,7 +383,12 @@ pub async fn connect<E: ClientExt + 'static>(
) -> (Client<E>, impl Future<Output = Result<(), Error>>) {
let client_connector = crate::client_connector_tokio::ClientConnectorTokio::default();
let (handle, mut future) = connect_with(client_fn, config, client_connector);
let future = async move { future.extract().await.unwrap_or(Err("client actor crashed".into())) };
let future = async move {
future
.extract()
.await
.unwrap_or(Err("client actor crashed".into()))
};
(handle, future)
}

Expand Down
14 changes: 10 additions & 4 deletions src/client_connectors/client_connector_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,30 @@ pub struct ClientConnectorTokio {

impl ClientConnectorTokio {
pub fn new(handle: tokio::runtime::Handle) -> Self {
Self { handle: handle.into() }
Self {
handle: handle.into(),
}
}
}

impl Default for ClientConnectorTokio {
fn default() -> Self {
let handle = enfync::builtin::native::TokioHandle::try_adopt()
.expect("ClientConnectorTokio::default() only works inside a tokio runtime; use ClientConnectorTokionew() instead");
.expect(
"ClientConnectorTokio::default() only works inside a tokio runtime; use ClientConnectorTokio::new() instead"
);
Self { handle }
}
}

#[async_trait::async_trait]
impl ClientConnector for ClientConnectorTokio {
type Handle = enfync::builtin::native::TokioHandle;
type Handle = enfync::builtin::native::TokioHandle;
type Message = tungstenite::Message;
type WSError = tungstenite::error::Error;
type Socket = tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
type Socket = tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;
type ConnectError = tungstenite::error::Error;

/// Get the connector's runtime handle.
Expand Down
1 change: 0 additions & 1 deletion src/client_connectors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#[cfg(feature = "native_client")]
pub mod client_connector_tokio;

Expand Down
18 changes: 12 additions & 6 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ impl Stream {
fn new<M, S>(
stream: S,
last_alive: Arc<Mutex<Instant>>,
handle: impl enfync::Handle
handle: impl enfync::Handle,
) -> (enfync::PendingResult<()>, Self)
where
M: Into<RawMessage> + std::fmt::Debug + Send + 'static,
Expand Down Expand Up @@ -507,7 +507,11 @@ pub struct Socket {
}

impl Socket {
pub fn new<M, E: std::error::Error, S>(socket: S, config: SocketConfig, handle: impl enfync::Handle) -> Self
pub fn new<M, E: std::error::Error, S>(
socket: S,
config: SocketConfig,
handle: impl enfync::Handle,
) -> Self
where
M: Into<RawMessage> + From<RawMessage> + std::fmt::Debug + Send + 'static,
E: Into<WSError>,
Expand All @@ -523,15 +527,17 @@ impl Socket {
);
let (hearbeat_abort_sender, hearbeat_abort_receiver) = oneshot::channel();
let sink_clone = sink.clone();
handle.spawn(async move { socket_heartbeat(sink_clone, config, hearbeat_abort_receiver, last_alive).await });
handle.spawn(async move {
socket_heartbeat(sink_clone, config, hearbeat_abort_receiver, last_alive).await
});

let (sink_result_sender, sink_result_receiver) = oneshot::channel();
handle.spawn(async move {
let _ = stream_future.extract().await;
let _ = sink_abort_sender.send(());
let _ = hearbeat_abort_sender.send(());
let _ =
sink_result_sender.send(sink_future.await.unwrap_or(Err(WSError::AlreadyClosed)));
sink_result_sender.send(sink_future.extract().await.unwrap_or(Err(WSError::AlreadyClosed)));
});

Self {
Expand Down Expand Up @@ -574,7 +580,7 @@ async fn socket_heartbeat(
config: SocketConfig,
mut abort_receiver: oneshot::Receiver<()>,
last_alive: Arc<Mutex<Instant>>,
){
) {
let mut interval = tokio::time::interval(config.heartbeat);

loop {
Expand All @@ -592,7 +598,7 @@ async fn socket_heartbeat(
}
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(Duration::default());
.unwrap_or_default();
if sink
.send_raw(InRawMessage::new((config.heartbeat_ping_msg_fn)(timestamp)))
.await
Expand Down

0 comments on commit ae604d6

Please sign in to comment.