Skip to content

Commit

Permalink
use SCRIPT LOAD for redis scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBM committed Jul 28, 2024
1 parent 419b3ea commit ba14653
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 117 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
# Changelog

## 12.0.0

Makes it so the scripts are loaded using `SCRIPT LOAD` so they aren't sent
to redis each time.

Adds disabled-by-default feature that enables milisecond time precission.

## 10.0.0 - 2024-05-08

Change format from queue names for compatibiliti with the original Nodejs version of the crate.
Change format from queue names for compatibiliti with the original Nodejs version of the crate.
Change details in here: https://github.com/DavidBM/rsmq-async-rs/pull/20

### Changed
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rsmq_async"
version = "11.2.0"
version = "12.0.0"
authors = [
"David Bonet <[email protected]>"
]
Expand Down
126 changes: 101 additions & 25 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,12 @@ use crate::{
RsmqError, RsmqResult,
};
use core::convert::TryFrom;
use lazy_static::lazy_static;
use radix_fmt::radix_36;
use rand::seq::IteratorRandom;
use redis::{aio::ConnectionLike, pipe, Script};
use redis::{aio::ConnectionLike, pipe};
use std::convert::TryInto;
use std::time::Duration;

lazy_static! {
static ref CHANGE_MESSAGE_VISIVILITY: Script =
Script::new(include_str!("./redis-scripts/changeMessageVisibility.lua"));
static ref POP_MESSAGE: Script = Script::new(include_str!("./redis-scripts/popMessage.lua"));
static ref RECEIVE_MESSAGE: Script =
Script::new(include_str!("./redis-scripts/receiveMessage.lua"));
}

const JS_COMPAT_MAX_TIME_MILLIS: u64 = 9_999_999_000;

#[cfg(feature = "break-js-comp")]
Expand All @@ -40,6 +31,75 @@ impl<T: ConnectionLike> std::fmt::Debug for RsmqFunctions<T> {
}
}

#[derive(Debug, Clone)]
pub struct CachedScript {
change_message_visibility_sha1: String,
receive_message_sha1: String,
}

impl CachedScript {
async fn init<T: ConnectionLike>(conn: &mut T) -> RsmqResult<Self> {
let change_message_visibility_sha1: String = redis::cmd("SCRIPT")
.arg("LOAD")
.arg(include_str!("./redis-scripts/changeMessageVisibility.lua"))
.query_async(conn)
.await?;
let receive_message_sha1: String = redis::cmd("SCRIPT")
.arg("LOAD")
.arg(include_str!("./redis-scripts/receiveMessage.lua"))
.query_async(conn)
.await?;
Ok(Self {
change_message_visibility_sha1,
receive_message_sha1,
})
}

async fn invoke_change_message_visibility<R, T: ConnectionLike>(
&self,
conn: &mut T,
key1: String,
key2: String,
key3: String,
) -> RsmqResult<R>
where
R: redis::FromRedisValue,
{
redis::cmd("EVALSHA")
.arg(&self.change_message_visibility_sha1)
.arg(3)
.arg(key1)
.arg(key2)
.arg(key3)
.query_async(conn)
.await
.map_err(Into::into)
}

async fn invoke_receive_message<R, T: ConnectionLike>(
&self,
conn: &mut T,
key1: String,
key2: String,
key3: String,
should_delete: String,
) -> RsmqResult<R>
where
R: redis::FromRedisValue,
{
redis::cmd("EVALSHA")
.arg(&self.receive_message_sha1)
.arg(3)
.arg(key1)
.arg(key2)
.arg(key3)
.arg(should_delete)
.query_async(conn)
.await
.map_err(Into::into)
}
}

impl<T: ConnectionLike> RsmqFunctions<T> {
/// Change the hidden time of a already sent message.
pub async fn change_message_visibility(
Expand All @@ -48,23 +108,30 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
qname: &str,
message_id: &str,
hidden: Duration,
cached_script: &CachedScript,
) -> RsmqResult<()> {
let hidden = get_redis_duration(Some(hidden), &Duration::from_secs(30));

let queue = self.get_queue(conn, qname, false).await?;

number_in_range(hidden, 0, JS_COMPAT_MAX_TIME_MILLIS)?;

CHANGE_MESSAGE_VISIVILITY
.key(format!("{}:{}", self.ns, qname))
.key(message_id)
.key(queue.ts + hidden)
.invoke_async::<_, bool>(conn)
cached_script
.invoke_change_message_visibility::<_, T>(
conn,
format!("{}:{}", self.ns, qname),
message_id.to_string(),
(queue.ts + hidden).to_string(),
)
.await?;

Ok(())
}

pub async fn load_scripts(&self, conn: &mut T) -> RsmqResult<CachedScript> {
CachedScript::init(conn).await
}

/// Creates a new queue. Attributes can be later modified with "set_queue_attributes" method
///
/// hidden: Time the messages will be hidden when they are received with the "receive_message" method.
Expand Down Expand Up @@ -266,13 +333,18 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
&self,
conn: &mut T,
qname: &str,
cached_script: &CachedScript,
) -> RsmqResult<Option<RsmqMessage<E>>> {
let queue = self.get_queue(conn, qname, false).await?;

let result: (bool, String, Vec<u8>, u64, u64) = POP_MESSAGE
.key(format!("{}:{}", self.ns, qname))
.key(queue.ts)
.invoke_async(conn)
let result: (bool, String, Vec<u8>, u64, u64) = cached_script
.invoke_receive_message(
conn,
format!("{}:{}", self.ns, qname),
queue.ts.to_string(),
queue.ts.to_string(),
"true".to_string(),
)
.await?;

if !result.0 {
Expand All @@ -298,17 +370,21 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
conn: &mut T,
qname: &str,
hidden: Option<Duration>,
cached_script: &CachedScript,
) -> RsmqResult<Option<RsmqMessage<E>>> {
let queue = self.get_queue(conn, qname, false).await?;

let hidden = get_redis_duration(hidden, &queue.vt);
number_in_range(hidden, 0, JS_COMPAT_MAX_TIME_MILLIS)?;

let result: (bool, String, Vec<u8>, u64, u64) = RECEIVE_MESSAGE
.key(format!("{}:{}", self.ns, qname))
.key(queue.ts)
.key(queue.ts + hidden)
.invoke_async(conn)
let result: (bool, String, Vec<u8>, u64, u64) = cached_script
.invoke_receive_message(
conn,
format!("{}:{}", self.ns, qname),
queue.ts.to_string(),
(queue.ts + hidden).to_string(),
"false".to_string(),
)
.await?;

if !result.0 {
Expand Down Expand Up @@ -474,7 +550,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.cmd("TIME")
.query_async(conn)
.await?;

#[cfg(feature = "break-js-comp")]
let time = (result.1).0 * 1000000 + (result.1).1;
#[cfg(not(feature = "break-js-comp"))]
Expand Down
46 changes: 27 additions & 19 deletions src/multiplexed_facade.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::functions::RsmqFunctions;
use crate::functions::{CachedScript, RsmqFunctions};
use crate::r#trait::RsmqConnection;
use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes};
use crate::RsmqResult;
Expand All @@ -19,6 +19,7 @@ impl std::fmt::Debug for RedisConnection {
pub struct Rsmq {
connection: RedisConnection,
functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
scripts: CachedScript,
}

impl Rsmq {
Expand All @@ -37,27 +38,28 @@ impl Rsmq {

let connection = client.get_multiplexed_async_connection().await?;

Ok(Rsmq::new_with_connection(
connection,
options.realtime,
Some(&options.ns),
))
Rsmq::new_with_connection(connection, options.realtime, Some(&options.ns)).await
}

/// Special method for when you already have a redis-rs connection and you don't want redis_async to create a new one.
pub fn new_with_connection(
connection: redis::aio::MultiplexedConnection,
pub async fn new_with_connection(
mut connection: redis::aio::MultiplexedConnection,
realtime: bool,
ns: Option<&str>,
) -> Rsmq {
Rsmq {
) -> RsmqResult<Rsmq> {
let functions = RsmqFunctions {
ns: ns.unwrap_or("rsmq").to_string(),
realtime,
conn: PhantomData,
};

let scripts = functions.load_scripts(&mut connection).await?;

Ok(Rsmq {
connection: RedisConnection(connection),
functions: RsmqFunctions {
ns: ns.unwrap_or("rsmq").to_string(),
realtime,
conn: PhantomData,
},
}
functions,
scripts,
})
}
}

Expand All @@ -70,7 +72,13 @@ impl RsmqConnection for Rsmq {
hidden: Duration,
) -> RsmqResult<()> {
self.functions
.change_message_visibility(&mut self.connection.0, qname, message_id, hidden)
.change_message_visibility(
&mut self.connection.0,
qname,
message_id,
hidden,
&self.scripts,
)
.await
}

Expand Down Expand Up @@ -111,7 +119,7 @@ impl RsmqConnection for Rsmq {
qname: &str,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.functions
.pop_message::<E>(&mut self.connection.0, qname)
.pop_message::<E>(&mut self.connection.0, qname, &self.scripts)
.await
}

Expand All @@ -121,7 +129,7 @@ impl RsmqConnection for Rsmq {
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.functions
.receive_message::<E>(&mut self.connection.0, qname, hidden)
.receive_message::<E>(&mut self.connection.0, qname, hidden, &self.scripts)
.await
}

Expand Down
Loading

0 comments on commit ba14653

Please sign in to comment.