Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental conversion control #61

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: redux
Title: R Bindings to 'hiredis'
Version: 1.1.4
Version: 1.1.5
Authors@R: c(person("Rich", "FitzJohn", role = c("aut", "cre"),
email = "[email protected]"))
Description: A 'hiredis' wrapper that includes support for
Expand Down
4 changes: 2 additions & 2 deletions R/connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ redis_connection <- function(config = redis_config()) {
invisible()
},

command = function(cmd) {
redis_command(ptr, cmd)
command = function(cmd, as = NULL) {
redis_command(ptr, cmd, as)
},

pipeline = function(cmds) {
Expand Down
4 changes: 2 additions & 2 deletions R/redis.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ redis_connect_unix <- function(path, timeout = NULL) {
.Call(Credux_redis_connect_unix, path, as.integer(timeout))
}

redis_command <- function(ptr, command) {
.Call(Credux_redis_command, ptr, command)
redis_command <- function(ptr, command, as = NULL) {
.Call(Credux_redis_command, ptr, command, as)
}

redis_pipeline <- function(ptr, list) {
Expand Down
8 changes: 5 additions & 3 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,18 @@ SEXP redux_redis_connect_unix(SEXP r_path, SEXP r_timeout) {
return extPtr;
}

SEXP redux_redis_command(SEXP extPtr, SEXP cmd) {
SEXP redux_redis_command(SEXP extPtr, SEXP cmd, SEXP r_as) {
redisContext *context = redis_get_context(extPtr, true);

reply_string_as as = r_reply_string_as(r_as);

cmd = PROTECT(redis_check_command(cmd));
const char **argv = NULL;
size_t *argvlen = NULL;
const size_t argc = sexp_to_redis(cmd, &argv, &argvlen);

redisReply *reply = redisCommandArgv(context, argc, argv, argvlen);
SEXP ret = PROTECT(redis_reply_to_sexp(reply, true));
SEXP ret = PROTECT(redis_reply_to_sexp(reply, true, as));
freeReplyObject(reply);
UNPROTECT(2);
return ret;
Expand Down Expand Up @@ -94,7 +96,7 @@ SEXP redux_redis_pipeline(SEXP extPtr, SEXP list) {
SEXP ret = PROTECT(allocVector(VECSXP, nc));
for (size_t i = 0; i < nc; ++i) {
redisGetReply(context, (void*)&reply);
SET_VECTOR_ELT(ret, i, redis_reply_to_sexp(reply, false));
SET_VECTOR_ELT(ret, i, redis_reply_to_sexp(reply, false, AS_AUTO));
freeReplyObject(reply);
}
UNPROTECT(2);
Expand Down
2 changes: 1 addition & 1 deletion src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
SEXP redux_redis_connect(SEXP host, SEXP port, SEXP timeout);
SEXP redux_redis_connect_unix(SEXP path, SEXP timeout);

SEXP redux_redis_command(SEXP extPtr, SEXP cmd);
SEXP redux_redis_command(SEXP extPtr, SEXP cmd, SEXP r_as);

SEXP redux_redis_pipeline(SEXP extPtr, SEXP list);

Expand Down
32 changes: 25 additions & 7 deletions src/conversions.c
Original file line number Diff line number Diff line change
@@ -1,22 +1,40 @@
#include "conversions.h"

SEXP redis_reply_to_sexp(redisReply* reply, bool error_throw) {
reply_string_as r_reply_string_as(SEXP r_as) {
if (r_as == R_NilValue) {
return AS_AUTO;
}

if (TYPEOF(r_as) == STRSXP && length(r_as) == 1) {
if (strcmp(CHAR(STRING_ELT(r_as, 0)), "raw") == 0) {
return AS_RAW;
} else if (strcmp(CHAR(STRING_ELT(r_as, 0)), "auto") == 0) {
return AS_AUTO;
}
}

Rf_error("Invalid option for 'as'");
return AS_AUTO; // #nocov
}

SEXP redis_reply_to_sexp(redisReply* reply, bool error_throw,
reply_string_as as) {
if (reply == NULL) {
error("Failure communicating with the Redis server");
}
switch(reply->type) {
case REDIS_REPLY_STATUS:
return status_to_sexp(reply->str);
case REDIS_REPLY_STRING:
return raw_string_to_sexp(reply->str, reply->len);
return raw_string_to_sexp(reply->str, reply->len, as);
case REDIS_REPLY_INTEGER:
return (reply->integer < INT_MAX ?
ScalarInteger(reply->integer) :
ScalarReal((double)reply->integer));
case REDIS_REPLY_NIL:
return R_NilValue;
case REDIS_REPLY_ARRAY:
return array_to_sexp(reply, error_throw);
return array_to_sexp(reply, error_throw, as);
case REDIS_REPLY_ERROR:
return reply_error(reply, error_throw);
default:
Expand Down Expand Up @@ -225,7 +243,7 @@ bool is_raw_string(const char* str, size_t len) {
return false;
}

SEXP raw_string_to_sexp(const char* str, size_t len) {
SEXP raw_string_to_sexp(const char* str, size_t len, reply_string_as as) {
// There are different approaches here to detecting a raw string; we
// can test for presence of a nul byte, but that involves a
// traversal of _every_ string. It really should be corect though
Expand All @@ -234,7 +252,7 @@ SEXP raw_string_to_sexp(const char* str, size_t len) {
// The strategy here is to check for a serialised object, then
// assume a string, but fall back on re-encoding as RAW (with an
// extra copy) if a nul byte is found
bool is_raw = is_raw_string(str, len);
bool is_raw = as == AS_RAW || is_raw_string(str, len);
SEXP ret;
if (is_raw) {
ret = PROTECT(allocVector(RAWSXP, len));
Expand All @@ -261,12 +279,12 @@ SEXP status_to_sexp(const char* str) {
return ret;
}

SEXP array_to_sexp(redisReply* reply, bool error_throw) {
SEXP array_to_sexp(redisReply* reply, bool error_throw, reply_string_as as) {
SEXP ret = PROTECT(allocVector(VECSXP, reply->elements));
size_t i;
for (i = 0; i < reply->elements; ++i) {
SET_VECTOR_ELT(ret, i,
redis_reply_to_sexp(reply->element[i], error_throw));
redis_reply_to_sexp(reply->element[i], error_throw, as));
}
UNPROTECT(1);
return ret;
Expand Down
14 changes: 11 additions & 3 deletions src/conversions.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@
#include <hiredis.h>
#include <stdbool.h>

typedef enum {
AS_AUTO,
AS_RAW
} reply_string_as;

reply_string_as r_reply_string_as(SEXP as);

/* whole reply */
SEXP redis_reply_to_sexp(redisReply* reply, bool error_throw);
SEXP redis_reply_to_sexp(redisReply* reply, bool error_throw,
reply_string_as as);

/* possible bits of a reply */
SEXP raw_string_to_sexp(const char* s, size_t len);
SEXP raw_string_to_sexp(const char* s, size_t len, reply_string_as as);
SEXP status_to_sexp(const char* s);
SEXP array_to_sexp(redisReply* reply, bool error_throw);
SEXP array_to_sexp(redisReply* reply, bool error_throw, reply_string_as as);
SEXP reply_error(redisReply* reply, bool error_throw);

/* detection */
Expand Down
2 changes: 1 addition & 1 deletion src/registration.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ static const R_CallMethodDef callMethods[] = {
{"Credux_redis_connect", (DL_FUNC) &redux_redis_connect, 3},
{"Credux_redis_connect_unix", (DL_FUNC) &redux_redis_connect_unix, 2},

{"Credux_redis_command", (DL_FUNC) &redux_redis_command, 2},
{"Credux_redis_command", (DL_FUNC) &redux_redis_command, 3},

{"Credux_redis_pipeline", (DL_FUNC) &redux_redis_pipeline, 2},

Expand Down
6 changes: 3 additions & 3 deletions src/subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ SEXP redux_redis_subscribe(SEXP extPtr, SEXP channel, SEXP pattern,
SET_VECTOR_ELT(cmd, 0, mkString(p ? "PSUBSCRIBE" : "SUBSCRIBE"));
SET_VECTOR_ELT(cmd, 1, channel);
cmd = PROTECT(redis_check_command(cmd));
SEXP ret = PROTECT(redux_redis_command(extPtr, cmd));
SEXP ret = PROTECT(redux_redis_command(extPtr, cmd, R_NilValue));

redux_redis_subscribe_loop(redis_get_context(extPtr, true),
p, callback, envir);
Expand Down Expand Up @@ -39,7 +39,7 @@ void redux_redis_subscribe_loop(redisContext* context, int pattern,
while (keep_going) {
R_CheckUserInterrupt();
redisGetReply(context, (void*)&reply);
SEXP x = PROTECT(redis_reply_to_sexp(reply, false));
SEXP x = PROTECT(redis_reply_to_sexp(reply, false, AS_AUTO));
setAttrib(x, R_NamesSymbol, nms);
SETCADR(call, x);
freeReplyObject(reply);
Expand Down Expand Up @@ -90,7 +90,7 @@ SEXP redux_redis_unsubscribe(SEXP extPtr, SEXP channel, SEXP pattern) {
n_discarded++;
redisGetReply(context, (void*)&reply);
}
SEXP ret = PROTECT(redis_reply_to_sexp(reply, true));
SEXP ret = PROTECT(redis_reply_to_sexp(reply, true, AS_AUTO));
freeReplyObject(reply);
if (n_discarded > 0) {
SEXP key = PROTECT(mkString("n_discarded"));
Expand Down
27 changes: 27 additions & 0 deletions tests/testthat/test-redis.R
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,30 @@ test_that("pointer commands are safe", {
expect_error(redis_command(ptr_null, "PING"),
"Context is not connected")
})


test_that("can get raw strings back if asked", {
skip_if_no_redis()
ptr <- redis_connect_tcp(REDIS_HOST, REDIS_PORT)
key <- rand_str()
value <- rand_str()

expect_equal(redis_command(ptr, list("SET", key, value)),
redis_status("OK"))

expect_equal(redis_command(ptr, list("GET", key), "auto"),
value)
expect_equal(redis_command(ptr, list("GET", key), "raw"),
charToRaw(value))

expect_error(redis_command(ptr, list("GET", key), "other"),
"Invalid option for 'as'")
expect_error(redis_command(ptr, list("GET", key), TRUE),
"Invalid option for 'as'")
expect_error(redis_command(ptr, list("GET", key), letters),
"Invalid option for 'as'")
expect_error(redis_command(ptr, list("GET", key), character()),
"Invalid option for 'as'")

expect_equal(redis_command(ptr, c("DEL", key)), 1L)
})
Loading