From ba14653815310c0851362f7ca99f0f8f63135f77 Mon Sep 17 00:00:00 2001 From: David Bonet Date: Sun, 28 Jul 2024 22:45:05 +0200 Subject: [PATCH] use SCRIPT LOAD for redis scripts --- CHANGELOG.md | 9 +- Cargo.toml | 2 +- src/functions.rs | 126 ++++++++++++++---- src/multiplexed_facade.rs | 46 ++++--- src/pooled_facade.rs | 52 ++++++-- src/redis-scripts/changeMessageVisibility.lua | 2 +- src/redis-scripts/popMessage.lua | 17 --- src/redis-scripts/receiveMessage.lua | 67 ++++++++-- src/sync_facade.rs | 38 ++++-- tests/support/mod.rs | 1 + tests/test.rs | 53 ++++++-- 11 files changed, 296 insertions(+), 117 deletions(-) delete mode 100644 src/redis-scripts/popMessage.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index b4feecc..20eaf75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,15 @@ # Changelog +## 12.0.0 + +Makes it so the scripts are loaded using `SCRIPT LOAD` so they aren't sent +to redis each time. + +Adds disabled-by-default feature that enables milisecond time precission. + ## 10.0.0 - 2024-05-08 -Change format from queue names for compatibiliti with the original Nodejs version of the crate. +Change format from queue names for compatibiliti with the original Nodejs version of the crate. Change details in here: https://github.com/DavidBM/rsmq-async-rs/pull/20 ### Changed diff --git a/Cargo.toml b/Cargo.toml index 8607ddc..500ef29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rsmq_async" -version = "11.2.0" +version = "12.0.0" authors = [ "David Bonet " ] diff --git a/src/functions.rs b/src/functions.rs index 36d4c76..336b013 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -4,21 +4,12 @@ use crate::{ RsmqError, RsmqResult, }; use core::convert::TryFrom; -use lazy_static::lazy_static; use radix_fmt::radix_36; use rand::seq::IteratorRandom; -use redis::{aio::ConnectionLike, pipe, Script}; +use redis::{aio::ConnectionLike, pipe}; use std::convert::TryInto; use std::time::Duration; -lazy_static! { - static ref CHANGE_MESSAGE_VISIVILITY: Script = - Script::new(include_str!("./redis-scripts/changeMessageVisibility.lua")); - static ref POP_MESSAGE: Script = Script::new(include_str!("./redis-scripts/popMessage.lua")); - static ref RECEIVE_MESSAGE: Script = - Script::new(include_str!("./redis-scripts/receiveMessage.lua")); -} - const JS_COMPAT_MAX_TIME_MILLIS: u64 = 9_999_999_000; #[cfg(feature = "break-js-comp")] @@ -40,6 +31,75 @@ impl std::fmt::Debug for RsmqFunctions { } } +#[derive(Debug, Clone)] +pub struct CachedScript { + change_message_visibility_sha1: String, + receive_message_sha1: String, +} + +impl CachedScript { + async fn init(conn: &mut T) -> RsmqResult { + let change_message_visibility_sha1: String = redis::cmd("SCRIPT") + .arg("LOAD") + .arg(include_str!("./redis-scripts/changeMessageVisibility.lua")) + .query_async(conn) + .await?; + let receive_message_sha1: String = redis::cmd("SCRIPT") + .arg("LOAD") + .arg(include_str!("./redis-scripts/receiveMessage.lua")) + .query_async(conn) + .await?; + Ok(Self { + change_message_visibility_sha1, + receive_message_sha1, + }) + } + + async fn invoke_change_message_visibility( + &self, + conn: &mut T, + key1: String, + key2: String, + key3: String, + ) -> RsmqResult + where + R: redis::FromRedisValue, + { + redis::cmd("EVALSHA") + .arg(&self.change_message_visibility_sha1) + .arg(3) + .arg(key1) + .arg(key2) + .arg(key3) + .query_async(conn) + .await + .map_err(Into::into) + } + + async fn invoke_receive_message( + &self, + conn: &mut T, + key1: String, + key2: String, + key3: String, + should_delete: String, + ) -> RsmqResult + where + R: redis::FromRedisValue, + { + redis::cmd("EVALSHA") + .arg(&self.receive_message_sha1) + .arg(3) + .arg(key1) + .arg(key2) + .arg(key3) + .arg(should_delete) + .query_async(conn) + .await + .map_err(Into::into) + } +} + impl RsmqFunctions { /// Change the hidden time of a already sent message. pub async fn change_message_visibility( @@ -48,6 +108,7 @@ impl RsmqFunctions { qname: &str, message_id: &str, hidden: Duration, + cached_script: &CachedScript, ) -> RsmqResult<()> { let hidden = get_redis_duration(Some(hidden), &Duration::from_secs(30)); @@ -55,16 +116,22 @@ impl RsmqFunctions { number_in_range(hidden, 0, JS_COMPAT_MAX_TIME_MILLIS)?; - CHANGE_MESSAGE_VISIVILITY - .key(format!("{}:{}", self.ns, qname)) - .key(message_id) - .key(queue.ts + hidden) - .invoke_async::<_, bool>(conn) + cached_script + .invoke_change_message_visibility::<_, T>( + conn, + format!("{}:{}", self.ns, qname), + message_id.to_string(), + (queue.ts + hidden).to_string(), + ) .await?; Ok(()) } + pub async fn load_scripts(&self, conn: &mut T) -> RsmqResult { + CachedScript::init(conn).await + } + /// Creates a new queue. Attributes can be later modified with "set_queue_attributes" method /// /// hidden: Time the messages will be hidden when they are received with the "receive_message" method. @@ -266,13 +333,18 @@ impl RsmqFunctions { &self, conn: &mut T, qname: &str, + cached_script: &CachedScript, ) -> RsmqResult>> { let queue = self.get_queue(conn, qname, false).await?; - let result: (bool, String, Vec, u64, u64) = POP_MESSAGE - .key(format!("{}:{}", self.ns, qname)) - .key(queue.ts) - .invoke_async(conn) + let result: (bool, String, Vec, u64, u64) = cached_script + .invoke_receive_message( + conn, + format!("{}:{}", self.ns, qname), + queue.ts.to_string(), + queue.ts.to_string(), + "true".to_string(), + ) .await?; if !result.0 { @@ -298,17 +370,21 @@ impl RsmqFunctions { conn: &mut T, qname: &str, hidden: Option, + cached_script: &CachedScript, ) -> RsmqResult>> { let queue = self.get_queue(conn, qname, false).await?; let hidden = get_redis_duration(hidden, &queue.vt); number_in_range(hidden, 0, JS_COMPAT_MAX_TIME_MILLIS)?; - let result: (bool, String, Vec, u64, u64) = RECEIVE_MESSAGE - .key(format!("{}:{}", self.ns, qname)) - .key(queue.ts) - .key(queue.ts + hidden) - .invoke_async(conn) + let result: (bool, String, Vec, u64, u64) = cached_script + .invoke_receive_message( + conn, + format!("{}:{}", self.ns, qname), + queue.ts.to_string(), + (queue.ts + hidden).to_string(), + "false".to_string(), + ) .await?; if !result.0 { @@ -474,7 +550,7 @@ impl RsmqFunctions { .cmd("TIME") .query_async(conn) .await?; - + #[cfg(feature = "break-js-comp")] let time = (result.1).0 * 1000000 + (result.1).1; #[cfg(not(feature = "break-js-comp"))] diff --git a/src/multiplexed_facade.rs b/src/multiplexed_facade.rs index 59478e6..3b8c6ae 100644 --- a/src/multiplexed_facade.rs +++ b/src/multiplexed_facade.rs @@ -1,4 +1,4 @@ -use crate::functions::RsmqFunctions; +use crate::functions::{CachedScript, RsmqFunctions}; use crate::r#trait::RsmqConnection; use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes}; use crate::RsmqResult; @@ -19,6 +19,7 @@ impl std::fmt::Debug for RedisConnection { pub struct Rsmq { connection: RedisConnection, functions: RsmqFunctions, + scripts: CachedScript, } impl Rsmq { @@ -37,27 +38,28 @@ impl Rsmq { let connection = client.get_multiplexed_async_connection().await?; - Ok(Rsmq::new_with_connection( - connection, - options.realtime, - Some(&options.ns), - )) + Rsmq::new_with_connection(connection, options.realtime, Some(&options.ns)).await } /// Special method for when you already have a redis-rs connection and you don't want redis_async to create a new one. - pub fn new_with_connection( - connection: redis::aio::MultiplexedConnection, + pub async fn new_with_connection( + mut connection: redis::aio::MultiplexedConnection, realtime: bool, ns: Option<&str>, - ) -> Rsmq { - Rsmq { + ) -> RsmqResult { + let functions = RsmqFunctions { + ns: ns.unwrap_or("rsmq").to_string(), + realtime, + conn: PhantomData, + }; + + let scripts = functions.load_scripts(&mut connection).await?; + + Ok(Rsmq { connection: RedisConnection(connection), - functions: RsmqFunctions { - ns: ns.unwrap_or("rsmq").to_string(), - realtime, - conn: PhantomData, - }, - } + functions, + scripts, + }) } } @@ -70,7 +72,13 @@ impl RsmqConnection for Rsmq { hidden: Duration, ) -> RsmqResult<()> { self.functions - .change_message_visibility(&mut self.connection.0, qname, message_id, hidden) + .change_message_visibility( + &mut self.connection.0, + qname, + message_id, + hidden, + &self.scripts, + ) .await } @@ -111,7 +119,7 @@ impl RsmqConnection for Rsmq { qname: &str, ) -> RsmqResult>> { self.functions - .pop_message::(&mut self.connection.0, qname) + .pop_message::(&mut self.connection.0, qname, &self.scripts) .await } @@ -121,7 +129,7 @@ impl RsmqConnection for Rsmq { hidden: Option, ) -> RsmqResult>> { self.functions - .receive_message::(&mut self.connection.0, qname, hidden) + .receive_message::(&mut self.connection.0, qname, hidden, &self.scripts) .await } diff --git a/src/pooled_facade.rs b/src/pooled_facade.rs index c7f1c24..3b0fd37 100644 --- a/src/pooled_facade.rs +++ b/src/pooled_facade.rs @@ -1,4 +1,4 @@ -use crate::functions::RsmqFunctions; +use crate::functions::{CachedScript, RsmqFunctions}; use crate::r#trait::RsmqConnection; use crate::types::RedisBytes; use crate::types::{RsmqMessage, RsmqOptions, RsmqQueueAttributes}; @@ -50,6 +50,7 @@ pub struct PoolOptions { pub struct PooledRsmq { pool: bb8::Pool, functions: RsmqFunctions, + scripts: CachedScript, } impl Clone for PooledRsmq { @@ -61,6 +62,7 @@ impl Clone for PooledRsmq { realtime: self.functions.realtime, conn: PhantomData, }, + scripts: self.scripts.clone(), } } } @@ -91,29 +93,51 @@ impl PooledRsmq { let pool = builder.build(manager).await?; + let mut conn = pool.get().await?; + + let functions = RsmqFunctions:: { + ns: options.ns.clone(), + realtime: options.realtime, + conn: PhantomData, + }; + + let scripts = functions.load_scripts(&mut conn).await?; + + drop(conn); + Ok(PooledRsmq { pool, - functions: RsmqFunctions { - ns: options.ns.clone(), - realtime: options.realtime, - conn: PhantomData, - }, + functions, + scripts, }) } - pub fn new_with_pool( + pub async fn new_with_pool( pool: bb8::Pool, realtime: bool, ns: Option<&str>, - ) -> PooledRsmq { - PooledRsmq { + ) -> RsmqResult { + let mut conn = pool.get().await?; + + let functions = RsmqFunctions:: { + ns: ns.unwrap_or("rsmq").to_string(), + realtime, + conn: PhantomData, + }; + + let scripts = functions.load_scripts(&mut conn).await?; + + drop(conn); + + Ok(PooledRsmq { pool, functions: RsmqFunctions { ns: ns.unwrap_or("rsmq").to_string(), realtime, conn: PhantomData, }, - } + scripts, + }) } } @@ -128,7 +152,7 @@ impl RsmqConnection for PooledRsmq { let mut conn = self.pool.get().await?; self.functions - .change_message_visibility(&mut conn, qname, message_id, hidden) + .change_message_visibility(&mut conn, qname, message_id, hidden, &self.scripts) .await } @@ -174,7 +198,9 @@ impl RsmqConnection for PooledRsmq { ) -> RsmqResult>> { let mut conn = self.pool.get().await?; - self.functions.pop_message::(&mut conn, qname).await + self.functions + .pop_message::(&mut conn, qname, &self.scripts) + .await } async fn receive_message>>( @@ -185,7 +211,7 @@ impl RsmqConnection for PooledRsmq { let mut conn = self.pool.get().await?; self.functions - .receive_message::(&mut conn, qname, hidden) + .receive_message::(&mut conn, qname, hidden, &self.scripts) .await } diff --git a/src/redis-scripts/changeMessageVisibility.lua b/src/redis-scripts/changeMessageVisibility.lua index e9dd125..029d848 100644 --- a/src/redis-scripts/changeMessageVisibility.lua +++ b/src/redis-scripts/changeMessageVisibility.lua @@ -1,6 +1,6 @@ local msg = redis.call("ZSCORE", KEYS[1], KEYS[2]) if not msg then - return false + return false end redis.call("ZADD", KEYS[1], KEYS[3], KEYS[2]) return true diff --git a/src/redis-scripts/popMessage.lua b/src/redis-scripts/popMessage.lua deleted file mode 100644 index 75aaee1..0000000 --- a/src/redis-scripts/popMessage.lua +++ /dev/null @@ -1,17 +0,0 @@ -local msg = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1") -if #msg == 0 then - return {false, "", "", 0, 0} -end -redis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1) -local mbody = redis.call("HGET", KEYS[1] .. ":Q", msg[1]) -local rc = redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1) -local o = {true, msg[1], mbody, rc} -if rc==1 then - table.insert(o, KEYS[2]) -else - local fr = redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr") - table.insert(o, fr) -end -redis.call("ZREM", KEYS[1], msg[1]) -redis.call("HDEL", KEYS[1] .. ":Q", msg[1], msg[1] .. ":rc", msg[1] .. ":fr") -return o diff --git a/src/redis-scripts/receiveMessage.lua b/src/redis-scripts/receiveMessage.lua index e59eccc..b970a3e 100644 --- a/src/redis-scripts/receiveMessage.lua +++ b/src/redis-scripts/receiveMessage.lua @@ -1,17 +1,58 @@ -local msg = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1") -if #msg == 0 then - return {false, "", "", 0, 0} +-- This function either retrieves a message from the Redis queue, updates its visibility timeout, +-- increments counters, and returns message details, or removes the message if specified. +-- KEYS[1]: The Redis key for the sorted set representing the message queue. +-- KEYS[2]: The current time or a specific timestamp used for score comparisons. +-- KEYS[3]: The new visibility timestamp used to update the message score. +-- ARGV[1]: A string "true" or "false" indicating whether to delete the message after processing. + +-- Find the next message due to be visible based on the current time (KEYS[2]) +local message = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1") + +-- If no message is found, return a default empty response +if #message == 0 then + return { false, "", "", 0, 0 } end -redis.call("ZADD", KEYS[1], KEYS[3], msg[1]) + +-- Check if the message should be deleted +local should_delete = ARGV[1] == "true" + +-- Increment the total received count for the queue redis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1) -local mbody = redis.call("HGET", KEYS[1] .. ":Q", msg[1]) -local rc = redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1) -local o = {true, msg[1], mbody, rc} -if rc==1 then - redis.call("HSET", KEYS[1] .. ":Q", msg[1] .. ":fr", KEYS[2]) - table.insert(o, KEYS[2]) + +-- Get the message body from the hash +local messageBody = redis.call("HGET", KEYS[1] .. ":Q", message[1]) + +-- Increment the receive count for this message +local receiveCount = redis.call("HINCRBY", KEYS[1] .. ":Q", message[1] .. ":rc", 1) + +-- Prepare the response table with message details +local response = { true, message[1], messageBody, receiveCount } + +-- If the message is received for the first time, set and add the current time to the response +if receiveCount == 1 then + redis.call("HSET", KEYS[1] .. ":Q", message[1] .. ":fr", KEYS[2]) + table.insert(response, KEYS[2]) else - local fr = redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr") - table.insert(o, fr) + -- Otherwise, get the first received time and add it to the response + local firstReceived = redis.call("HGET", KEYS[1] .. ":Q", message[1] .. ":fr") + table.insert(response, firstReceived) end -return o + +-- Update or remove the message based on the should_delete flag +if should_delete then + -- Remove the message from the sorted set + redis.call("ZREM", KEYS[1], message[1]) + -- Delete the message details from the hash + redis.call("HDEL", KEYS[1] .. ":Q", message[1], message[1] .. ":rc", message[1] .. ":fr") +else + -- Update the message's score to the new visibility timestamp (KEYS[3]) + redis.call("ZADD", KEYS[1], KEYS[3], message[1]) +end + +-- Return the response containing: +-- [1] boolean indicating if a message was found, +-- [2] message ID, +-- [3] message body, +-- [4] receive count, +-- [5] first received timestamp (either current time or previously set time) +return response diff --git a/src/sync_facade.rs b/src/sync_facade.rs index 19c1cff..64b8ace 100644 --- a/src/sync_facade.rs +++ b/src/sync_facade.rs @@ -1,6 +1,4 @@ -use tokio::runtime::Runtime; - -use crate::functions::RsmqFunctions; +use crate::functions::{CachedScript, RsmqFunctions}; use crate::r#trait::RsmqConnection; use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes}; use crate::{RsmqError, RsmqResult}; @@ -8,6 +6,7 @@ use core::convert::TryFrom; use core::marker::PhantomData; use std::sync::Arc; use std::time::Duration; +use tokio::runtime::Runtime; #[derive(Clone)] struct RedisConnection(redis::aio::MultiplexedConnection); @@ -23,6 +22,7 @@ pub struct RsmqSync { connection: RedisConnection, functions: RsmqFunctions, runner: Arc, + scripts: CachedScript, } impl RsmqSync { @@ -44,17 +44,23 @@ impl RsmqSync { let client = redis::Client::open(conn_info)?; - let connection = - runner.block_on(async move { client.get_multiplexed_async_connection().await })?; + let functions = RsmqFunctions { + ns: options.ns, + realtime: options.realtime, + conn: PhantomData, + }; + + let (connection, scripts) = runner.block_on(async { + let mut conn = client.get_multiplexed_async_connection().await?; + let scripts = functions.load_scripts(&mut conn).await?; + Result::<_, RsmqError>::Ok((conn, scripts)) + })?; Ok(RsmqSync { connection: RedisConnection(connection), - functions: RsmqFunctions { - ns: options.ns, - realtime: options.realtime, - conn: PhantomData, - }, + functions, runner: Arc::new(runner), + scripts, }) } } @@ -69,7 +75,13 @@ impl RsmqConnection for RsmqSync { ) -> RsmqResult<()> { self.runner.block_on(async { self.functions - .change_message_visibility(&mut self.connection.0, qname, message_id, hidden) + .change_message_visibility( + &mut self.connection.0, + qname, + message_id, + hidden, + &self.scripts, + ) .await }) } @@ -121,7 +133,7 @@ impl RsmqConnection for RsmqSync { ) -> RsmqResult>> { self.runner.block_on(async { self.functions - .pop_message::(&mut self.connection.0, qname) + .pop_message::(&mut self.connection.0, qname, &self.scripts) .await }) } @@ -133,7 +145,7 @@ impl RsmqConnection for RsmqSync { ) -> RsmqResult>> { self.runner.block_on(async { self.functions - .receive_message::(&mut self.connection.0, qname, hidden) + .receive_message::(&mut self.connection.0, qname, hidden, &self.scripts) .await }) } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 8c67a6a..db1d7a8 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -76,6 +76,7 @@ impl Drop for RedisServer { } pub struct TestContext { + #[allow(dead_code)] pub server: RedisServer, pub client: redis::Client, } diff --git a/tests/test.rs b/tests/test.rs index a26fb38..bd42a82 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,6 +1,6 @@ mod support; -use rsmq_async::{RedisBytes, Rsmq, RsmqConnection, RsmqError}; +use rsmq_async::{RedisBytes, Rsmq, RsmqConnection as _, RsmqError}; use std::{convert::TryFrom, time::Duration}; use support::*; @@ -11,7 +11,9 @@ fn send_receiving_deleting_message() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue1", None, None, None).await.unwrap(); @@ -48,7 +50,9 @@ fn send_receiving_delayed_message() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue1", None, None, None).await.unwrap(); @@ -107,7 +111,9 @@ fn send_receiving_deleting_message_vec_u8() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue1", None, None, None).await.unwrap(); @@ -155,7 +161,9 @@ fn send_receiving_deleting_message_custom_type() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue1", None, None, None).await.unwrap(); @@ -192,7 +200,9 @@ fn pop_message() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue2", None, None, None).await.unwrap(); @@ -223,7 +233,9 @@ fn pop_message_vec_u8() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue2", None, None, None).await.unwrap(); @@ -254,7 +266,9 @@ fn creating_queue() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue3", None, None, None).await.unwrap(); @@ -281,7 +295,9 @@ fn updating_queue() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue4", None, None, None).await.unwrap(); @@ -329,7 +345,9 @@ fn deleting_queue() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue5", None, None, None).await.unwrap(); @@ -386,7 +404,9 @@ fn change_message_visibility() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue6", None, None, None).await.unwrap(); @@ -434,7 +454,9 @@ fn change_queue_size() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue6", None, None, None).await.unwrap(); @@ -456,7 +478,9 @@ fn sent_messages_must_keep_order() { rt.block_on(async move { let ctx = TestContext::new(); let connection = ctx.async_connection().await.unwrap(); - let mut rsmq = Rsmq::new_with_connection(connection, false, None); + let mut rsmq = Rsmq::new_with_connection(connection, false, None) + .await + .unwrap(); rsmq.create_queue("queue1", None, None, None).await.unwrap(); @@ -470,7 +494,8 @@ fn sent_messages_must_keep_order() { let message = rsmq .receive_message::("queue1", None) .await - .unwrap().unwrap(); + .unwrap() + .unwrap(); assert_eq!(message.message, format!("testmessage{}", i)); rsmq.delete_message("queue1", &message.id).await.unwrap();