From b62f4ffa3f16f0256f2f3496744b8ff3aadd4422 Mon Sep 17 00:00:00 2001 From: Rob Date: Wed, 10 Jul 2024 13:16:34 -0400 Subject: [PATCH 1/6] add `try_send` --- src/channel/bounded.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/channel/bounded.rs b/src/channel/bounded.rs index b1e8e04..2398727 100644 --- a/src/channel/bounded.rs +++ b/src/channel/bounded.rs @@ -47,7 +47,7 @@ mod inner { /// inner module, used to group feature-specific imports #[cfg(async_channel_impl = "flume")] mod inner { - pub use flume::{RecvError, SendError, TryRecvError}; + pub use flume::{RecvError, SendError, TryRecvError, TrySendError}; use flume::{r#async::RecvStream, Receiver as InnerReceiver, Sender as InnerSender}; @@ -77,7 +77,7 @@ mod inner { /// inner module, used to group feature-specific imports #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))] mod inner { - pub use async_std::channel::{RecvError, SendError, TryRecvError}; + pub use async_std::channel::{RecvError, SendError, TryRecvError, TrySendError}; use async_std::channel::{Receiver as InnerReceiver, Sender as InnerSender}; @@ -121,6 +121,20 @@ impl Sender { result } + + /// Try to send a value over the channel. Will return immediately if the channel is full. + /// + /// # Errors + /// - If the channel is full + /// - If the channel is dropped + pub async fn try_send(&self, msg: T) -> Result<(), TrySendError> { + #[cfg(async_channel_impl = "flume")] + let result = self.0.try_send(msg); + #[cfg(not(all(async_channel_impl = "flume")))] + let result = self.0.try_send(msg).await; + + result + } } impl Receiver { From 9beb1175362bf34446c663f46f859148f8d729ab Mon Sep 17 00:00:00 2001 From: Rob Date: Wed, 10 Jul 2024 13:20:13 -0400 Subject: [PATCH 2/6] fix compile --- src/channel/bounded.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/channel/bounded.rs b/src/channel/bounded.rs index 2398727..b7d0f6f 100644 --- a/src/channel/bounded.rs +++ b/src/channel/bounded.rs @@ -127,11 +127,11 @@ impl Sender { /// # Errors /// - If the channel is full /// - If the channel is dropped - pub async fn try_send(&self, msg: T) -> Result<(), TrySendError> { + pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { #[cfg(async_channel_impl = "flume")] let result = self.0.try_send(msg); #[cfg(not(all(async_channel_impl = "flume")))] - let result = self.0.try_send(msg).await; + let result = self.0.try_send(msg); result } From 1a0a3dbaa24a93abc1dcddbed2cf2855a3b665d3 Mon Sep 17 00:00:00 2001 From: Rob Date: Wed, 10 Jul 2024 13:22:27 -0400 Subject: [PATCH 3/6] add `trysenderror` to exports --- src/channel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/channel.rs b/src/channel.rs index cf2d1e6..065b857 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -26,7 +26,7 @@ compile_error!( "async_channel_impl = 'tokio' requires tokio runtime, e. g. async_executor_impl = 'tokio'" ); -pub use bounded::{bounded, BoundedStream, Receiver, RecvError, SendError, Sender, TryRecvError}; +pub use bounded::{bounded, BoundedStream, Receiver, RecvError, SendError, Sender, TryRecvError, TrySendError}; pub use oneshot::{oneshot, OneShotReceiver, OneShotRecvError, OneShotSender, OneShotTryRecvError}; pub use unbounded::{ unbounded, UnboundedReceiver, UnboundedRecvError, UnboundedSendError, UnboundedSender, From 6cb6342e19337dfed8732357a9007b641e1692dd Mon Sep 17 00:00:00 2001 From: Rob Date: Wed, 10 Jul 2024 13:23:35 -0400 Subject: [PATCH 4/6] fmt --- src/channel.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/channel.rs b/src/channel.rs index 065b857..54063ad 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -26,7 +26,9 @@ compile_error!( "async_channel_impl = 'tokio' requires tokio runtime, e. g. async_executor_impl = 'tokio'" ); -pub use bounded::{bounded, BoundedStream, Receiver, RecvError, SendError, Sender, TryRecvError, TrySendError}; +pub use bounded::{ + bounded, BoundedStream, Receiver, RecvError, SendError, Sender, TryRecvError, TrySendError, +}; pub use oneshot::{oneshot, OneShotReceiver, OneShotRecvError, OneShotSender, OneShotTryRecvError}; pub use unbounded::{ unbounded, UnboundedReceiver, UnboundedRecvError, UnboundedSendError, UnboundedSender, From a4337ccdd8223bf640953f228c7ecf21ea505f41 Mon Sep 17 00:00:00 2001 From: Rob Date: Wed, 10 Jul 2024 13:27:43 -0400 Subject: [PATCH 5/6] fix tokio --- src/channel/bounded.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/channel/bounded.rs b/src/channel/bounded.rs index b7d0f6f..756bf7f 100644 --- a/src/channel/bounded.rs +++ b/src/channel/bounded.rs @@ -5,7 +5,7 @@ use futures::Stream; /// inner module, used to group feature-specific imports #[cfg(async_channel_impl = "tokio")] mod inner { - pub use tokio::sync::mpsc::error::{SendError, TryRecvError}; + pub use tokio::sync::mpsc::error::{SendError, TryRecvError, TrySendError}; use tokio::sync::mpsc::{Receiver as InnerReceiver, Sender as InnerSender}; @@ -128,12 +128,7 @@ impl Sender { /// - If the channel is full /// - If the channel is dropped pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { - #[cfg(async_channel_impl = "flume")] - let result = self.0.try_send(msg); - #[cfg(not(all(async_channel_impl = "flume")))] - let result = self.0.try_send(msg); - - result + self.0.try_send(msg) } } From e1b09e87968014fc983a9557e25f2918457ceaa1 Mon Sep 17 00:00:00 2001 From: Rob Date: Wed, 10 Jul 2024 13:30:50 -0400 Subject: [PATCH 6/6] version bump --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 63145b5..5d337ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "async-compatibility-layer" description = "an abstraction layer for using both async-std and tokio" authors = ["Espresso Systems "] -version = "1.2.0" +version = "1.2.1" edition = "2021" license = "MIT"