diff --git a/src/functions.rs b/src/functions.rs index faa7047..2607752 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -19,7 +19,7 @@ lazy_static! { Script::new(include_str!("./redis-scripts/receiveMessage.lua")); } -static JS_COMPAT_MAX_TIME_MILLIS: u64 = 9_999_999_000; +const JS_COMPAT_MAX_TIME_MILLIS: u64 = 9_999_999_000; /// The main object of this library. Creates/Handles the redis connection and contains all the methods #[derive(Clone)] @@ -212,7 +212,7 @@ impl RsmqFunctions { .arg(&key) .cmd("ZCOUNT") .arg(&key) - .arg(time.0) + .arg(time.0 * 1000) .arg("+inf") .query_async(conn) .await?; @@ -470,7 +470,7 @@ impl RsmqFunctions { .query_async(conn) .await?; - let time_millis = (result.1).0 * 1000; + let time_micros = (result.1).0 * 1000000 + (result.1).1; let (hmget_first, hmget_second, hmget_third) = match (result.0.first(), result.0.get(1), result.0.get(2)) { @@ -479,7 +479,7 @@ impl RsmqFunctions { }; let quid = if uid { - Some(radix_36(time_millis).to_string() + &RsmqFunctions::::make_id(22)?) + Some(radix_36(time_micros).to_string() + &RsmqFunctions::::make_id(22)?) } else { None }; @@ -494,7 +494,7 @@ impl RsmqFunctions { maxsize: hmget_third .parse() .map_err(|_| RsmqError::CannotParseMaxsize)?, - ts: time_millis, + ts: time_micros / 1000, uid: quid, }) } diff --git a/tests/test.rs b/tests/test.rs index fe9cf31..d1892d1 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -447,3 +447,40 @@ fn change_queue_size() { assert_eq!(attributes.maxsize, -1); }) } + +#[test] +fn sent_messages_must_keep_order() { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async move { + let ctx = TestContext::new(); + let connection = ctx.async_connection().await.unwrap(); + let mut rsmq = Rsmq::new_with_connection(connection, false, None); + + rsmq.create_queue("queue1", None, None, None).await.unwrap(); + + for i in 0..10000 { + rsmq.send_message("queue1", format!("testmessage{}", i), None) + .await + .unwrap(); + } + + for i in 0..10000 { + let message = rsmq + .receive_message::("queue1", None) + .await + .unwrap().unwrap(); + assert_eq!(message.message, format!("testmessage{}", i)); + + rsmq.delete_message("queue1", &message.id).await.unwrap(); + } + + let message = rsmq + .receive_message::("queue1", None) + .await + .unwrap(); + + assert!(message.is_none()); + rsmq.delete_queue("queue1").await.unwrap(); + }) +}