diff --git a/Cargo.toml b/Cargo.toml index 0b5e5a1..23953fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rsmq_async" -version = "7.0.2" +version = "8.0.0" authors = [ "David Bonet " ] diff --git a/src/functions.rs b/src/functions.rs index ac222b2..70bc742 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -9,6 +9,7 @@ use radix_fmt::radix_36; use rand::seq::IteratorRandom; use redis::{aio::ConnectionLike, pipe, Script}; use std::convert::TryInto; +use std::time::Duration; lazy_static! { static ref CHANGE_MESSAGE_VISIVILITY: Script = @@ -39,11 +40,13 @@ impl RsmqFunctions { conn: &mut T, qname: &str, message_id: &str, - seconds_hidden: u64, + seconds_hidden: Duration, ) -> RsmqResult<()> { + let seconds_hidden = get_redis_duration(Some(seconds_hidden), &Duration::from_secs(30)); + let queue = self.get_queue(conn, qname, false).await?; - number_in_range(seconds_hidden, 0, 9_999_999)?; + number_in_range(seconds_hidden, 0, 9_999_999_000)?; CHANGE_MESSAGE_VISIVILITY .key(format!("{}{}", self.ns, qname)) @@ -66,18 +69,18 @@ impl RsmqFunctions { &self, conn: &mut T, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult<()> { valid_name_format(qname)?; let key = format!("{}{}:Q", self.ns, qname); - let seconds_hidden = seconds_hidden.unwrap_or(30); - let delay = delay.unwrap_or(0); + let seconds_hidden = get_redis_duration(seconds_hidden, &Duration::from_secs(30)); + let delay = get_redis_duration(delay, &Duration::ZERO); let maxsize = maxsize.unwrap_or(65536); - number_in_range(seconds_hidden, 0, 9_999_999)?; + number_in_range(seconds_hidden, 0, 9_999_999_000)?; number_in_range(delay, 0, 9_999_999)?; if let Err(error) = number_in_range(maxsize, 1024, 65536) { if maxsize != -1 { @@ -219,8 +222,18 @@ impl RsmqFunctions { } Ok(RsmqQueueAttributes { - vt: result.0.first().unwrap_or(&Some(0)).unwrap_or(0), - delay: result.0.get(1).unwrap_or(&Some(0)).unwrap_or(0), + vt: result + .0 + .first() + .and_then(Option::as_ref) + .map(|dur| Duration::from_millis(*dur)) + .unwrap_or(Duration::ZERO), + delay: result + .0 + .get(1) + .and_then(Option::as_ref) + .map(|dur| Duration::from_millis(*dur)) + .unwrap_or(Duration::ZERO), maxsize: result.0.get(2).unwrap_or(&Some(0)).unwrap_or(0), totalrecv: result.0.get(3).unwrap_or(&Some(0)).unwrap_or(0), totalsent: result.0.get(4).unwrap_or(&Some(0)).unwrap_or(0), @@ -277,13 +290,12 @@ impl RsmqFunctions { &self, conn: &mut T, qname: &str, - seconds_hidden: Option, + seconds_hidden: Option, ) -> RsmqResult>> { let queue = self.get_queue(conn, qname, false).await?; - let seconds_hidden = seconds_hidden.unwrap_or(queue.vt); - - number_in_range(seconds_hidden, 0, 9_999_999)?; + let seconds_hidden = get_redis_duration(seconds_hidden, &queue.vt); + number_in_range(seconds_hidden, 0, 9_999_999_000)?; let result: (bool, String, Vec, u64, u64) = RECEIVE_MESSAGE .key(format!("{}{}", self.ns, qname)) @@ -313,11 +325,11 @@ impl RsmqFunctions { conn: &mut T, qname: &str, message: E, - delay: Option, + delay: Option, ) -> RsmqResult { let queue = self.get_queue(conn, qname, true).await?; - let delay = delay.unwrap_or(queue.delay); + let delay = get_redis_duration(delay, &queue.delay); let key = format!("{}{}", self.ns, qname); number_in_range(delay, 0, 9_999_999)?; @@ -386,8 +398,8 @@ impl RsmqFunctions { &self, conn: &mut T, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult { self.get_queue(conn, qname, false).await?; @@ -405,8 +417,9 @@ impl RsmqFunctions { .arg("modified") .arg(time.0); - if let Some(duration) = seconds_hidden { - number_in_range(duration, 0, 9_999_999)?; + if seconds_hidden.is_some() { + let duration = get_redis_duration(seconds_hidden, &Duration::from_secs(30)); + number_in_range(duration, 0, 9_999_999_000)?; commands = commands .cmd("HSET") .arg(&queue_name) @@ -414,7 +427,8 @@ impl RsmqFunctions { .arg(duration); } - if let Some(delay) = delay { + if delay.is_some() { + let delay = get_redis_duration(delay, &Duration::ZERO); number_in_range(delay, 0, 9_999_999)?; commands = commands .cmd("HSET") @@ -454,7 +468,7 @@ impl RsmqFunctions { .query_async(conn) .await?; - let time_seconds = (result.1).0; + let time_millis = (result.1).0 * 1000; let (hmget_first, hmget_second, hmget_third) = match (result.0.get(0), result.0.get(1), result.0.get(2)) { @@ -463,20 +477,22 @@ impl RsmqFunctions { }; let quid = if uid { - Some(radix_36(time_seconds).to_string() + &RsmqFunctions::::make_id(22)?) + Some(radix_36(time_millis).to_string() + &RsmqFunctions::::make_id(22)?) } else { None }; Ok(QueueDescriptor { - vt: hmget_first.parse().map_err(|_| RsmqError::CannotParseVT)?, - delay: hmget_second - .parse() - .map_err(|_| RsmqError::CannotParseDelay)?, + vt: Duration::from_millis(hmget_first.parse().map_err(|_| RsmqError::CannotParseVT)?), + delay: Duration::from_millis( + hmget_second + .parse() + .map_err(|_| RsmqError::CannotParseDelay)?, + ), maxsize: hmget_third .parse() .map_err(|_| RsmqError::CannotParseMaxsize)?, - ts: time_seconds, + ts: time_millis, uid: quid, }) } @@ -527,3 +543,11 @@ fn valid_name_format(name: &str) -> RsmqResult<()> { Ok(()) } + +fn get_redis_duration(d: Option, default: &Duration) -> u64 { + d.as_ref() + .map(Duration::as_millis) + .map(u64::try_from) + .and_then(Result::ok) + .unwrap_or_else(|| u64::try_from(default.as_millis()).ok().unwrap_or(30_000)) +} diff --git a/src/multiplexed_facade.rs b/src/multiplexed_facade.rs index 59ca4a2..badd092 100644 --- a/src/multiplexed_facade.rs +++ b/src/multiplexed_facade.rs @@ -4,6 +4,7 @@ use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes}; use crate::RsmqResult; use core::convert::TryFrom; use core::marker::PhantomData; +use std::time::Duration; #[derive(Clone)] struct RedisConnection(redis::aio::MultiplexedConnection); @@ -66,7 +67,7 @@ impl RsmqConnection for MultiplexedRsmq { &mut self, qname: &str, message_id: &str, - seconds_hidden: u64, + seconds_hidden: Duration, ) -> RsmqResult<()> { self.functions .change_message_visibility(&mut self.connection.0, qname, message_id, seconds_hidden) @@ -76,8 +77,8 @@ impl RsmqConnection for MultiplexedRsmq { async fn create_queue( &mut self, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult<()> { self.functions @@ -123,7 +124,7 @@ impl RsmqConnection for MultiplexedRsmq { async fn receive_message>>( &mut self, qname: &str, - seconds_hidden: Option, + seconds_hidden: Option, ) -> RsmqResult>> { self.functions .receive_message::(&mut self.connection.0, qname, seconds_hidden) @@ -134,7 +135,7 @@ impl RsmqConnection for MultiplexedRsmq { &mut self, qname: &str, message: E, - delay: Option, + delay: Option, ) -> RsmqResult { self.functions .send_message(&mut self.connection.0, qname, message, delay) @@ -144,8 +145,8 @@ impl RsmqConnection for MultiplexedRsmq { async fn set_queue_attributes( &mut self, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult { self.functions diff --git a/src/normal_facade.rs b/src/normal_facade.rs index 1cce125..fe7eace 100644 --- a/src/normal_facade.rs +++ b/src/normal_facade.rs @@ -4,6 +4,7 @@ use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes}; use crate::RsmqResult; use core::convert::TryFrom; use core::marker::PhantomData; +use std::time::Duration; struct RedisConnection(redis::aio::Connection); @@ -65,7 +66,7 @@ impl RsmqConnection for Rsmq { &mut self, qname: &str, message_id: &str, - seconds_hidden: u64, + seconds_hidden: Duration, ) -> RsmqResult<()> { self.functions .change_message_visibility(&mut self.connection.0, qname, message_id, seconds_hidden) @@ -75,8 +76,8 @@ impl RsmqConnection for Rsmq { async fn create_queue( &mut self, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult<()> { self.functions @@ -122,7 +123,7 @@ impl RsmqConnection for Rsmq { async fn receive_message>>( &mut self, qname: &str, - seconds_hidden: Option, + seconds_hidden: Option, ) -> RsmqResult>> { self.functions .receive_message::(&mut self.connection.0, qname, seconds_hidden) @@ -133,7 +134,7 @@ impl RsmqConnection for Rsmq { &mut self, qname: &str, message: E, - delay: Option, + delay: Option, ) -> RsmqResult { self.functions .send_message(&mut self.connection.0, qname, message, delay) @@ -143,8 +144,8 @@ impl RsmqConnection for Rsmq { async fn set_queue_attributes( &mut self, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult { self.functions diff --git a/src/pooled_facade.rs b/src/pooled_facade.rs index 04f55e0..7a52ae5 100644 --- a/src/pooled_facade.rs +++ b/src/pooled_facade.rs @@ -7,6 +7,7 @@ use async_trait::async_trait; use core::convert::TryFrom; use redis::RedisError; use std::marker::PhantomData; +use std::time::Duration; #[derive(Clone, Debug)] pub struct RedisConnectionManager { @@ -104,7 +105,7 @@ impl RsmqConnection for PooledRsmq { &mut self, qname: &str, message_id: &str, - seconds_hidden: u64, + seconds_hidden: Duration, ) -> RsmqResult<()> { let mut conn = self.pool.get().await?; @@ -116,8 +117,8 @@ impl RsmqConnection for PooledRsmq { async fn create_queue( &mut self, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult<()> { let mut conn = self.pool.get().await?; @@ -161,7 +162,7 @@ impl RsmqConnection for PooledRsmq { async fn receive_message>>( &mut self, qname: &str, - seconds_hidden: Option, + seconds_hidden: Option, ) -> RsmqResult>> { let mut conn = self.pool.get().await?; @@ -174,7 +175,7 @@ impl RsmqConnection for PooledRsmq { &mut self, qname: &str, message: E, - delay: Option, + delay: Option, ) -> RsmqResult { let mut conn = self.pool.get().await?; @@ -186,8 +187,8 @@ impl RsmqConnection for PooledRsmq { async fn set_queue_attributes( &mut self, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult { let mut conn = self.pool.get().await?; diff --git a/src/trait.rs b/src/trait.rs index 87936f7..1e29196 100644 --- a/src/trait.rs +++ b/src/trait.rs @@ -2,17 +2,18 @@ use crate::types::RedisBytes; use crate::types::{RsmqMessage, RsmqQueueAttributes}; use crate::RsmqResult; use core::convert::TryFrom; +use std::time::Duration; #[async_trait::async_trait] pub trait RsmqConnection { /// Change the hidden time of a already sent message. - /// + /// /// `seconds_hidden` has a max time of 9_999_999 for compatibility reasons to this library JS version counterpart async fn change_message_visibility( &mut self, qname: &str, message_id: &str, - seconds_hidden: u64, + seconds_hidden: Duration, ) -> RsmqResult<()>; /// Creates a new queue. Attributes can be later modified with "set_queue_attributes" method @@ -27,8 +28,8 @@ pub trait RsmqConnection { async fn create_queue( &mut self, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult<()>; @@ -53,21 +54,21 @@ pub trait RsmqConnection { /// Returns a message. The message stays hidden for some time (defined by "seconds_hidden" argument or the queue /// settings). After that time, the message will be redelivered. In order to avoid the redelivery, you need to use /// the "delete_message" after this function. - /// + /// /// `seconds_hidden` has a max time of 9_999_999 for compatibility reasons to this library JS version counterpart. async fn receive_message>>( &mut self, qname: &str, - seconds_hidden: Option, + seconds_hidden: Option, ) -> RsmqResult>>; - /// Sends a message to the queue. The message will be delayed some time (controlled by the "delayed" argument or + /// Sends a message to the queue. The message will be delayed some time (controlled by the "delayed" argument or /// the queue settings) before being delivered to a client. async fn send_message + Send>( &mut self, qname: &str, message: E, - delay: Option, + delay: Option, ) -> RsmqResult; /// Modify the queue attributes. Keep in mind that "seconds_hidden" and "delay" can be overwritten when the message @@ -83,8 +84,8 @@ pub trait RsmqConnection { async fn set_queue_attributes( &mut self, qname: &str, - seconds_hidden: Option, - delay: Option, + seconds_hidden: Option, + delay: Option, maxsize: Option, ) -> RsmqResult; } diff --git a/src/types.rs b/src/types.rs index f770a2c..8528418 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,9 +1,9 @@ -use std::convert::TryFrom; +use std::{convert::TryFrom, time::Duration}; #[derive(Debug)] pub(crate) struct QueueDescriptor { - pub vt: u64, - pub delay: u64, + pub vt: Duration, + pub delay: Duration, pub maxsize: i64, pub ts: u64, pub uid: Option, @@ -61,11 +61,11 @@ pub struct RsmqMessage = String> { /// Struct defining a queue. They are set on "create_queue" and "set_queue_attributes" #[derive(Debug, Clone)] pub struct RsmqQueueAttributes { - /// How many seconds the message will be hidden when is received by a client - pub vt: u64, + /// How long the message will be hidden when is received by a client + pub vt: Duration, /// How many second will take until the message is delivered to a client /// since it was sent - pub delay: u64, + pub delay: Duration, /// Max size of the message in bytes in the queue pub maxsize: u64, /// Number of messages received by the queue diff --git a/tests/test.rs b/tests/test.rs index 2228adb..6bac970 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,7 +1,7 @@ mod support; use rsmq_async::{RedisBytes, Rsmq, RsmqConnection, RsmqError}; -use std::convert::TryFrom; +use std::{convert::TryFrom, time::Duration}; use support::*; #[test] @@ -52,7 +52,7 @@ fn send_receiving_delayed_message() { rsmq.create_queue("queue1", None, None, None).await.unwrap(); - rsmq.send_message("queue1", "testmessage", Some(2)) + rsmq.send_message("queue1", "testmessage", Some(Duration::from_secs(2))) .await .unwrap(); @@ -287,8 +287,8 @@ fn updating_queue() { let attributes = rsmq.get_queue_attributes("queue4").await.unwrap(); - assert_eq!(attributes.vt, 30); - assert_eq!(attributes.delay, 0); + assert_eq!(attributes.vt, Duration::from_secs(30)); + assert_eq!(attributes.delay, Duration::ZERO); assert_eq!(attributes.maxsize, 65536); assert_eq!(attributes.totalrecv, 0); assert_eq!(attributes.totalsent, 0); @@ -297,14 +297,19 @@ fn updating_queue() { assert!(attributes.created > 0); assert!(attributes.modified > 0); - rsmq.set_queue_attributes("queue4", Some(45), Some(5), Some(2048)) - .await - .unwrap(); + rsmq.set_queue_attributes( + "queue4", + Some(Duration::from_secs(45)), + Some(Duration::from_secs(5)), + Some(2048), + ) + .await + .unwrap(); let attributes = rsmq.get_queue_attributes("queue4").await.unwrap(); - assert_eq!(attributes.vt, 45); - assert_eq!(attributes.delay, 5); + assert_eq!(attributes.vt, Duration::from_secs(45)); + assert_eq!(attributes.delay, Duration::from_secs(5)); assert_eq!(attributes.maxsize, 2048); assert_eq!(attributes.totalrecv, 0); assert_eq!(attributes.totalsent, 0); @@ -357,7 +362,12 @@ fn deleting_queue() { } let result = rsmq - .set_queue_attributes("queue5", Some(45), Some(5), Some(2048)) + .set_queue_attributes( + "queue5", + Some(Duration::from_secs(45)), + Some(Duration::from_secs(5)), + Some(2048), + ) .await; assert!(result.is_err()); @@ -398,7 +408,7 @@ fn change_message_visibility() { .unwrap(); assert!(message.is_none()); - rsmq.change_message_visibility("queue6", &message_id, 0) + rsmq.change_message_visibility("queue6", &message_id, Duration::ZERO) .await .unwrap();