Skip to content

Commit

Permalink
hide time precision changes under feature flag to preserve js compat
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBM committed Jul 12, 2024
1 parent 57139d9 commit 122efe9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ default = ["tokio-comp", "sync"]
sync = ["tokio"]
tokio-comp = ["redis/tokio-comp"]
async-std-comp = ["redis/async-std-comp"]
break-js-comp = []
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ workers for new messages with SUBSCRIBE to prevent multiple simultaneous
If you enable the `sync` feature, you can import a `RsmqSync` object with sync
versions of the methods.

## Time Precision

By default this library keeps compatibility with the JS counterpart. If you require
sub-second precision or are sending many messages very close together and require to
keep track of them with more precision than one second, you can enable the feature
`break-js-comp` like this on your `Cargo.toml`

```toml
rsmq_async = { version = "11", features = [ "break-js-comp" ] }
```

## Guarantees

If you want to implement "at least one delivery" guarantee, you need to receive
Expand Down
24 changes: 16 additions & 8 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ lazy_static! {

const JS_COMPAT_MAX_TIME_MILLIS: u64 = 9_999_999_000;

#[cfg(feature = "break-js-comp")]
const TIME_MULTIPLIER: u64 = 1000;
#[cfg(not(feature = "break-js-comp"))]
const TIME_MULTIPLIER: u64 = 1;

/// The main object of this library. Creates/Handles the redis connection and contains all the methods
#[derive(Clone)]
pub struct RsmqFunctions<T: ConnectionLike> {
Expand Down Expand Up @@ -133,7 +138,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
redis::cmd("SADD")
.arg(format!("{}:QUEUES", self.ns))
.arg(qname)
.query_async(conn)
.query_async::<_, ()>(conn)
.await?;

Ok(())
Expand Down Expand Up @@ -212,7 +217,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.arg(&key)
.cmd("ZCOUNT")
.arg(&key)
.arg(time.0 * 1000)
.arg(time.0 * TIME_MULTIPLIER)
.arg("+inf")
.query_async(conn)
.await?;
Expand Down Expand Up @@ -382,7 +387,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
redis::cmd("PUBLISH")
.arg(format!("{}:rt:{}", self.ns, qname))
.arg(result[3])
.query_async(conn)
.query_async::<_, ()>(conn)
.await?;
}

Expand Down Expand Up @@ -453,7 +458,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.arg(maxsize);
}

commands.query_async(conn).await?;
commands.query_async::<_, ()>(conn).await?;

self.get_queue_attributes(conn, qname).await
}
Expand All @@ -469,8 +474,11 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
.cmd("TIME")
.query_async(conn)
.await?;

let time_micros = (result.1).0 * 1000000 + (result.1).1;

#[cfg(feature = "break-js-comp")]
let time = (result.1).0 * 1000000 + (result.1).1;
#[cfg(not(feature = "break-js-comp"))]
let time = (result.1).0 * 1000;

let (hmget_first, hmget_second, hmget_third) =
match (result.0.first(), result.0.get(1), result.0.get(2)) {
Expand All @@ -479,7 +487,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
};

let quid = if uid {
Some(radix_36(time_micros).to_string() + &RsmqFunctions::<T>::make_id(22)?)
Some(radix_36(time).to_string() + &RsmqFunctions::<T>::make_id(22)?)
} else {
None
};
Expand All @@ -494,7 +502,7 @@ impl<T: ConnectionLike> RsmqFunctions<T> {
maxsize: hmget_third
.parse()
.map_err(|_| RsmqError::CannotParseMaxsize)?,
ts: time_micros / 1000,
ts: time / TIME_MULTIPLIER,
uid: quid,
})
}
Expand Down
11 changes: 11 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@
//! the Redis SUBSCRIBE command to be notified of new messages and issue a `receiveMessage` then. However make sure not
//! to listen with multiple workers for new messages with SUBSCRIBE to prevent multiple simultaneous `receiveMessage`
//! calls.
//!
//! ## Time Precision
//!
//! By default this library keeps compatibility with the JS counterpart. If you require
//! sub-second precision or are sending many messages very close together and require to
//! keep track of them with more precision than one second, you can enable the feature
//! `break-js-comp` like this on your `Cargo.toml`
//!
//! ```toml
//! rsmq_async = { version = "11", features = [ "break-js-comp" ] }
//! ```
//!
//! ## Guarantees
//!
Expand Down
1 change: 1 addition & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ fn change_queue_size() {
})
}

#[cfg(feature = "break-js-comp")]
#[test]
fn sent_messages_must_keep_order() {
let rt = tokio::runtime::Runtime::new().unwrap();
Expand Down

0 comments on commit 122efe9

Please sign in to comment.