-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Redis 5.0.x X* commands #43
Comments
Hey @richfitz, first of all thanks for the excellent package. It has been a huge leg up. Second, we find ourselves using the X* functions via XREADGROUP <- function(r, group, consumer, streams, count=NULL, block=NULL, noack=FALSE){
args <- list("XREADGROUP", "GROUP", group, consumer)
if (!is.null(count)){
args <- c(args, c("COUNT", count))
}
if (!is.null(block)){
args <- c(args, c("BLOCK", block))
}
if (noack){
args <- c(args, "NOACK")
}
args <- c(args, "STREAMS")
args <- c(args, names(streams))
args <- c(args, streams)
redis_response <- r$command(args)
if (is.null(redis_response)){
# No new messages
return(redis_response)
}
# redis_response is a list of streams:
# each stream is a list with two items:
# 1. A Stream Name
# 2. A list of messages where each messages is a list with two items:
# 1. Message id
# 2. A list of key-value pairs that alternates key, value, key, value...
out <- list()
for (stream_section_raw in redis_response){
stream_section_out <- list()
stream_name <- stream_section_raw[[1]]
stream_section_out[["stream"]] <- stream_name
messages <- list()
for (i in 1:length(stream_section_raw[[2]])){
message_raw <- stream_section_raw[[2]][[i]]
# The payload is a list that alternates key, value, key, value...
keys <- message_raw[[2]][c(TRUE, FALSE)]
values <- message_raw[[2]][c(FALSE, TRUE)]
names(values) <- keys
message_out <- list(
id=message_raw[[1]],
payload=values
)
messages[[i]] <- message_out
}
stream_section_out[['messages']] <- messages
out[[stream_name]] <- stream_section_out
}
return(out)
}
XADD <- function(r, stream, values, id="*", maxlength=NULL){
args = list("XADD", stream)
if (!is.null(maxlength)){
args <- c(args, "MAXLEN", "~", maxlength)
}
args <- c(args, id)
for (field in names(values)){
args <- c(args, field, values[[field]])
}
return(r$command(args))
}
XACK <- function(r, stream, group, ids){
args = list("XACK", stream, group)
args <- c(args, ids)
return(r$command(args))
}
### Usage
# Create the consumer group "r-worker" on the stream "r-test"
r$command(list("XGROUP", "CREATE", "r-test", "r-worker", "$", "MKSTREAM"))
# Add messages to the "r-test" stream
x1 <- XADD(r, "r-test", list("value1"="apples", "value2"="something"))
x2 <- XADD(r, "r-test", list("value1"="bananas", "value2"="something else"))
# Read messages from the "r-test" stream from consumer group "r-worker" as consumer "consumer1"
y <- XREADGROUP(r, "r-worker", "consumer1", list("r-test"=">", "r-test2"=">"))
print(x[['r-test']]$messages[[1]]$payload$value1) # Prints 'apples'
print(x[['r-test']]$messages[[2]]$payload$value1) # Prints 'bananas'
# Acknowledge the messages
z <- XACK(r, "r-test", "r-worker", c(x1, x2)) |
@MaxTaggart thank you for the head start and examples. @richfitz if you still need help here I would be interested in contributing as I need to create Redis Stream interface for communication between a web application and R models. |
Omitted from #41 as the basic interface generated did not seem usable. This needs input from someone who is using these.
Note that these can be manually used using
$command()
The text was updated successfully, but these errors were encountered: