Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Redis 5.0.x X* commands #43

Open
richfitz opened this issue Dec 1, 2020 · 2 comments
Open

Add support for Redis 5.0.x X* commands #43

richfitz opened this issue Dec 1, 2020 · 2 comments

Comments

@richfitz
Copy link
Owner

richfitz commented Dec 1, 2020

Omitted from #41 as the basic interface generated did not seem usable. This needs input from someone who is using these.

Note that these can be manually used using $command()

con$command(list("XINFO", "STREAM", "mystream"))
@acksmaggart
Copy link

acksmaggart commented Jun 9, 2021

Hey @richfitz, first of all thanks for the excellent package. It has been a huge leg up. Second, we find ourselves using the X* functions via con$command as you suggest. I don't have the bandwidth to be very involved, but in case it is helpful and/or for anyone else coming across this issue here are some code snippets that implement rough versions of XADD, XACK, and XREADGROUP. They all take a redis connection as their first argument. Let me know if there is any other context or info that would be helpful.

XREADGROUP <- function(r, group, consumer, streams, count=NULL, block=NULL, noack=FALSE){
  args <- list("XREADGROUP", "GROUP", group, consumer)
  if (!is.null(count)){
    args <- c(args, c("COUNT", count))
  }
  if (!is.null(block)){
    args <- c(args, c("BLOCK", block))
  } 
  if (noack){
    args <- c(args, "NOACK")
  }
  args <- c(args, "STREAMS")
  args <- c(args, names(streams))
  args <- c(args, streams)
  redis_response <- r$command(args)
  if (is.null(redis_response)){
    # No new messages
    return(redis_response)
  }
  # redis_response is a list of streams:
    # each stream is a list with two items:
      # 1. A Stream Name
      # 2. A list of messages where each messages is a list with two items:
        # 1. Message id
        # 2. A list of key-value pairs that alternates key, value, key, value...
  out <- list()
  for (stream_section_raw in redis_response){
    stream_section_out <- list()
    stream_name <- stream_section_raw[[1]]
    stream_section_out[["stream"]] <- stream_name
    messages <- list()
    for (i in 1:length(stream_section_raw[[2]])){
      message_raw <- stream_section_raw[[2]][[i]]
      # The payload is a list that alternates key, value, key, value...
      keys <- message_raw[[2]][c(TRUE, FALSE)]
      values <- message_raw[[2]][c(FALSE, TRUE)]
      names(values) <- keys
      message_out <- list(
        id=message_raw[[1]],
        payload=values
      )
      messages[[i]] <- message_out
    }
    stream_section_out[['messages']] <- messages
    out[[stream_name]] <- stream_section_out
  }
  return(out)
}

XADD <- function(r, stream, values, id="*", maxlength=NULL){
  args = list("XADD", stream)
  if (!is.null(maxlength)){
    args <- c(args, "MAXLEN", "~", maxlength)
  }
  args <- c(args, id)
  for (field in names(values)){
    args <- c(args, field, values[[field]])
  }
  return(r$command(args))
}


XACK <- function(r, stream, group, ids){
  args = list("XACK", stream, group)
  args <- c(args, ids)
  return(r$command(args))
}

### Usage
# Create the consumer group "r-worker" on the stream "r-test"
r$command(list("XGROUP", "CREATE", "r-test", "r-worker", "$", "MKSTREAM"))
# Add messages to the "r-test" stream
x1 <- XADD(r, "r-test", list("value1"="apples", "value2"="something"))
x2 <- XADD(r, "r-test", list("value1"="bananas", "value2"="something else"))
# Read messages from the "r-test" stream from consumer group "r-worker" as consumer "consumer1"
y <- XREADGROUP(r, "r-worker", "consumer1", list("r-test"=">", "r-test2"=">"))
print(x[['r-test']]$messages[[1]]$payload$value1) # Prints 'apples'
print(x[['r-test']]$messages[[2]]$payload$value1) # Prints 'bananas'
# Acknowledge the messages
z <- XACK(r, "r-test", "r-worker", c(x1, x2))

@skadaman
Copy link

skadaman commented Jan 7, 2023

@MaxTaggart thank you for the head start and examples. @richfitz if you still need help here I would be interested in contributing as I need to create Redis Stream interface for communication between a web application and R models.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants