Skip to content

Commit

Permalink
Append : to ns (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
t4sk authored May 8, 2024
1 parent 7deb19a commit 709a7d8
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
number_in_range(hidden, 0, JS_COMPAT_MAX_TIME_MILLIS)?;

CHANGE_MESSAGE_VISIVILITY
.key(format!("{}{}", self.ns, qname))
.key(format!("{}:{}", self.ns, qname))
.key(message_id)
.key(queue.ts + hidden)
.invoke_async::<_, bool>(conn)
Expand All @@ -77,7 +77,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
) -> RsmqResult<()> {
valid_name_format(qname)?;

let key = format!("{}{}:Q", self.ns, qname);
let key = format!("{}:{}:Q", self.ns, qname);
let hidden = get_redis_duration(hidden, &Duration::from_secs(30));
let delay = get_redis_duration(delay, &Duration::ZERO);
let maxsize = maxsize.unwrap_or(65536);
Expand Down Expand Up @@ -131,7 +131,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
}

redis::cmd("SADD")
.arg(format!("{}QUEUES", self.ns))
.arg(format!("{}:QUEUES", self.ns))
.arg(qname)
.query_async(conn)
.await?;
Expand All @@ -143,7 +143,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
///
/// Important to use when you are using receive_message.
pub async fn delete_message(&self, conn: &mut T, qname: &str, id: &str) -> RsmqResult<bool> {
let key = format!("{}{}", self.ns, qname);
let key = format!("{}:{}", self.ns, qname);

let results: (u16, u16) = pipe()
.atomic()
Expand All @@ -167,15 +167,15 @@ impl<T: ConnectionLike> RsmqFunctions<T> {

/// Deletes the queue and all the messages on it
pub async fn delete_queue(&self, conn: &mut T, qname: &str) -> RsmqResult<()> {
let key = format!("{}{}", self.ns, qname);
let key = format!("{}:{}", self.ns, qname);

let results: (u16, u16) = pipe()
.atomic()
.cmd("DEL")
.arg(format!("{}:Q", &key))
.arg(key)
.cmd("SREM")
.arg(format!("{}QUEUES", self.ns))
.arg(format!("{}:QUEUES", self.ns))
.arg(qname)
.query_async(conn)
.await?;
Expand All @@ -193,7 +193,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
conn: &mut T,
qname: &str,
) -> RsmqResult<RsmqQueueAttributes> {
let key = format!("{}{}", self.ns, qname);
let key = format!("{}:{}", self.ns, qname);

let time: (u64, u64) = redis::cmd("TIME").query_async(conn).await?;

Expand Down Expand Up @@ -249,7 +249,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
/// Returns a list of queues in the namespace
pub async fn list_queues(&self, conn: &mut T) -> RsmqResult<Vec<String>> {
let queues = redis::cmd("SMEMBERS")
.arg(format!("{}QUEUES", self.ns))
.arg(format!("{}:QUEUES", self.ns))
.query_async(conn)
.await?;

Expand All @@ -265,7 +265,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
let queue = self.get_queue(conn, qname, false).await?;

let result: (bool, String, Vec<u8>, u64, u64) = POP_MESSAGE
.key(format!("{}{}", self.ns, qname))
.key(format!("{}:{}", self.ns, qname))
.key(queue.ts)
.invoke_async(conn)
.await?;
Expand Down Expand Up @@ -300,7 +300,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
number_in_range(hidden, 0, JS_COMPAT_MAX_TIME_MILLIS)?;

let result: (bool, String, Vec<u8>, u64, u64) = RECEIVE_MESSAGE
.key(format!("{}{}", self.ns, qname))
.key(format!("{}:{}", self.ns, qname))
.key(queue.ts)
.key(queue.ts + hidden)
.invoke_async(conn)
Expand Down Expand Up @@ -332,7 +332,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
let queue = self.get_queue(conn, qname, true).await?;

let delay = get_redis_duration(delay, &queue.delay);
let key = format!("{}{}", self.ns, qname);
let key = format!("{}:{}", self.ns, qname);

number_in_range(delay, 0, JS_COMPAT_MAX_TIME_MILLIS)?;

Expand Down Expand Up @@ -406,7 +406,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
) -> RsmqResult<RsmqQueueAttributes> {
self.get_queue(conn, qname, false).await?;

let queue_name = format!("{}{}:Q", self.ns, qname);
let queue_name = format!("{}:{}:Q", self.ns, qname);

let time: (u64, u64) = redis::cmd("TIME").query_async(conn).await?;

Expand Down Expand Up @@ -462,7 +462,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
let result: (Vec<Option<String>>, (u64, u64)) = pipe()
.atomic()
.cmd("HMGET")
.arg(format!("{}{}:Q", self.ns, qname))
.arg(format!("{}:{}:Q", self.ns, qname))
.arg("vt")
.arg("delay")
.arg("maxsize")
Expand Down

0 comments on commit 709a7d8

Please sign in to comment.