Skip to content

Commit

Permalink
Redis logging (ray-project#17)
Browse files Browse the repository at this point in the history
* Redis logging

* Rearrange logging interfaces

* Fix test case

* Changes to logging interface and test case for logging

* Fixes

* Fix memory leaks

* Add interface method to destroy logger

* is_local -> is_direct

* Merge fix
  • Loading branch information
pcmoritz authored Sep 24, 2016
1 parent 7a07954 commit e1b8711
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 34 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ $(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a
$(BUILD)/task_tests: test/task_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ $^ $(CFLAGS)

$(BUILD)/redis_tests: hiredis test/redis_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/redis_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)
$(BUILD)/redis_tests: hiredis test/redis_tests.c $(BUILD)/libcommon.a logging.h
$(CC) -o $@ test/redis_tests.c logging.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

clean:
rm -f *.o state/*.o test/*.o
Expand Down
15 changes: 15 additions & 0 deletions io.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <string.h>
#include <stdio.h>
#include <inttypes.h>
#include <stdarg.h>
#include <utstring.h>

#include "common.h"

Expand Down Expand Up @@ -139,3 +141,16 @@ char *read_string(int fd) {
read_bytes(fd, &bytes, &length);
return (char *) bytes;
}

void write_formatted_string(int socket_fd, const char *format, ...) {
UT_string *cmd;
va_list ap;

utstring_new(cmd);
va_start(ap, format);
utstring_printf_va(cmd, format, ap);
va_end(ap);

write_string(socket_fd, utstring_body(cmd));
utstring_free(cmd);
}
1 change: 1 addition & 0 deletions io.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ void write_bytes(int fd, uint8_t *bytes, int64_t length);
void read_bytes(int fd, uint8_t **bytes, int64_t *length);

void write_string(int fd, char *message);
void write_formatted_string(int fd, const char *format, ...);
char *read_string(int fd);

#endif
78 changes: 78 additions & 0 deletions logging.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#include "logging.h"

#include <hiredis/hiredis.h>
#include <utstring.h>

#include "state/redis.h"
#include "io.h"

static const char *log_levels[5] = {"DEBUG", "INFO", "WARN", "ERROR", "FATAL"};
static const char *log_fmt =
"HMSET log:%s:%s:%s log_level %s event_type %s message %s timestamp %s";

struct ray_logger_impl {
/* String that identifies this client type. */
char *client_type;
/* Suppress all log messages below this level. */
int log_level;
/* Whether or not we have a direct connection to Redis. */
int is_direct;
/* Either a db_conn or a socket to a process with a db_conn,
* depending on the is_direct flag. */
void *conn;
};

ray_logger *init_ray_logger(const char *client_type,
int log_level,
int is_direct,
void *conn) {
ray_logger *logger = malloc(sizeof(ray_logger));
logger->client_type = client_type;
logger->log_level = log_level;
logger->is_direct = is_direct;
logger->conn = conn;
return logger;
}

void free_ray_logger(ray_logger *logger) {
free(logger);
}

void ray_log(ray_logger *logger,
int log_level,
const char *event_type,
const char *message) {
if (log_level < logger->log_level) {
return;
}
if (log_level < RAY_DEBUG || log_level > RAY_FATAL) {
return;
}
struct timeval tv;
UT_string *timestamp;
utstring_new(timestamp);
gettimeofday(&tv, NULL);
utstring_printf(timestamp, "%ld.%ld", tv.tv_sec, tv.tv_usec);

UT_string *origin_id;
utstring_new(origin_id);
if (logger->is_direct) {
db_conn *db = (db_conn *) logger->conn;
utstring_printf(origin_id, "%ld:%s", db->client_id, "");
redisAsyncCommand(db->context, NULL, NULL, log_fmt,
utstring_body(timestamp), logger->client_type,
utstring_body(origin_id), log_levels[log_level],
event_type, message, utstring_body(timestamp));
} else {
/* If we don't own a Redis connection, we leave our client
* ID to be filled in by someone else. */
utstring_printf(origin_id, "%s:%s", "%ld", "%ld");
int *socket_fd = (int *) logger->conn;
write_formatted_string(*socket_fd, log_fmt, utstring_body(timestamp),
logger->client_type, utstring_body(origin_id),
log_levels[log_level], event_type, message,
utstring_body(timestamp));
}
utstring_free(origin_id);
utstring_free(timestamp);
}
39 changes: 39 additions & 0 deletions logging.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef LOGGING_H
#define LOGGING_H

#define RAY_VERBOSE -1
#define RAY_DEBUG 0
#define RAY_INFO 1
#define RAY_WARNING 2
#define RAY_ERROR 3
#define RAY_FATAL 4

/* Entity types. */
#define RAY_FUNCTION "FUNCTION"
#define RAY_OBJECT "OBJECT"
#define RAY_TASK "TASK"

typedef struct ray_logger_impl ray_logger;

/* Initialize a Ray logger for the given client type and logging level. If the
* is_direct flag is set, the logger will treat the given connection as a
* direct connection to the log. Otherwise, it will treat it as a socket to
* another process with a connection to the log.
* NOTE: User is responsible for freeing the returned logger. */
ray_logger *init_ray_logger(const char *client_type,
int log_level,
int is_direct,
void *conn);

/* Free the logger. This does not free the connection to the log. */
void free_ray_logger(ray_logger *logger);

/* Log an event at the given log level with the given event_type.
* NOTE: message cannot contain spaces! JSON format is recommended.
* TODO: Support spaces in messages. */
void ray_log(ray_logger *logger,
int log_level,
const char *event_type,
const char *message);

#endif
20 changes: 0 additions & 20 deletions state/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,23 +232,3 @@ void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec *task) {
}
utstring_free(command);
}

void send_redis_command(int socket_fd, const char *format, ...) {
char *cmd;
va_list ap;
int len;

va_start(ap, format);
len = redisvFormatCommand(&cmd, format, ap);
va_end(ap);
if (len == -1) {
LOG_ERR("Out of memory while formatting Redis command.");
return;
} else if (len == -2) {
LOG_ERR("Invalid Redis format string.");
return;
}

write_string(socket_fd, cmd);
free(cmd);
}
5 changes: 4 additions & 1 deletion state/redis.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#ifndef REDIS_H
#define REDIS_H

#include "db.h"
#include "object_table.h"

Expand Down Expand Up @@ -47,4 +50,4 @@ void object_table_lookup_callback(redisAsyncContext *c,
void *r,
void *privdata);

void send_redis_command(int socket_fd, const char *format, ...);
#endif
89 changes: 78 additions & 11 deletions test/redis_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "state/db.h"
#include "state/redis.h"
#include "io.h"
#include "logging.h"

SUITE(redis_tests);

Expand All @@ -31,6 +32,16 @@ void async_redis_socket_test_callback(redisAsyncContext *ac,
lookup_successful = 1;
}

void logging_test_callback(redisAsyncContext *ac, void *r, void *privdata) {
redisContext *context = redisConnect("127.0.0.1", 6379);
redisReply *reply = redisCommand(context, "KEYS %s", "log:*");
redisFree(context);
assert(reply != NULL);
assert(reply->elements > 0);
freeReplyObject(reply);
lookup_successful = 1;
}

TEST redis_socket_test(void) {
const char *socket_pathname = "redis-test-socket";
redisContext *context = redisConnect("127.0.0.1", 6379);
Expand All @@ -40,8 +51,7 @@ TEST redis_socket_test(void) {

int client_fd = connect_ipc_sock(socket_pathname);
ASSERT(client_fd >= 0);

send_redis_command(client_fd, test_set_format, test_key, test_value);
write_formatted_string(client_fd, test_set_format, test_key, test_value);

int server_fd = accept_client(socket_fd);
char *cmd = read_string(server_fd);
Expand All @@ -50,11 +60,10 @@ TEST redis_socket_test(void) {
close(socket_fd);
unlink(socket_pathname);

redisAppendFormattedCommand(context, cmd, strlen(cmd));
redisReply *tmp;
redisGetReply(context, &tmp);
freeReplyObject(tmp);
redisReply *reply = redisCommand(context, "GET %s", test_key);
redisReply *reply;
reply = redisCommand(context, cmd, 0, 0);
freeReplyObject(reply);
reply = redisCommand(context, "GET %s", test_key);
ASSERT(reply != NULL);
ASSERT_STR_EQ(reply->str, test_value);
freeReplyObject(reply);
Expand Down Expand Up @@ -82,7 +91,64 @@ TEST async_redis_socket_test(void) {
/* Send a command to the Redis process. */
client_fd = connect_ipc_sock(socket_pathname);
ASSERT(client_fd >= 0);
send_redis_command(client_fd, test_set_format, test_key, test_value);
write_formatted_string(client_fd, test_set_format, test_key, test_value);

while (!lookup_successful) {
int num_ready = event_loop_poll(&loop, -1);
if (num_ready < 0) {
exit(-1);
}
for (int i = 0; i < event_loop_size(&loop); ++i) {
struct pollfd *waiting = event_loop_get(&loop, i);
if (waiting->revents == 0)
continue;
if (i == db_index) {
db_event(&conn);
} else if (i == ipc_index) {
/* For some reason, this check is necessary for Travis
* to pass these tests. */
ASSERT(waiting->revents & POLLIN);
server_fd = accept_client(socket_fd);
ASSERT(server_fd >= 0);
event_loop_attach(&loop, 1, NULL, server_fd, POLLIN);
} else {
char *cmd = read_string(waiting->fd);
redisAsyncCommand(conn.context, async_redis_socket_test_callback, NULL,
cmd, conn.client_id, 0);
free(cmd);
}
}
}
db_disconnect(&conn);
event_loop_free(&loop);
close(server_fd);
close(client_fd);
close(socket_fd);
unlink(socket_pathname);
lookup_successful = 0;
PASS();
}

TEST logging_test(void) {
int socket_fd, server_fd, client_fd;
event_loop loop;
event_loop_init(&loop);
/* Start IPC channel. */
const char *socket_pathname = "logging-test-socket";
socket_fd = bind_ipc_sock(socket_pathname);
ASSERT(socket_fd >= 0);
int64_t ipc_index = event_loop_attach(&loop, 1, NULL, socket_fd, POLLIN);

/* Start connection to Redis. */
db_conn conn;
db_connect("127.0.0.1", 6379, "", "", 0, &conn);
int64_t db_index = db_attach(&conn, &loop, 0);

/* Send a command to the Redis process. */
client_fd = connect_ipc_sock(socket_pathname);
ASSERT(client_fd >= 0);
ray_logger *logger = init_ray_logger("worker", RAY_INFO, 0, &client_fd);
ray_log(logger, RAY_INFO, "TEST", "Message");

while (!lookup_successful) {
int num_ready = event_loop_poll(&loop, -1);
Expand All @@ -104,13 +170,13 @@ TEST async_redis_socket_test(void) {
event_loop_attach(&loop, 1, NULL, server_fd, POLLIN);
} else {
char *cmd = read_string(waiting->fd);
redisAsyncFormattedCommand(conn.context,
async_redis_socket_test_callback, NULL, cmd,
strlen(cmd));
redisAsyncCommand(conn.context, logging_test_callback, NULL, cmd,
conn.client_id, 0);
free(cmd);
}
}
}
free_ray_logger(logger);
db_disconnect(&conn);
event_loop_free(&loop);
close(server_fd);
Expand All @@ -126,6 +192,7 @@ SUITE(redis_tests) {
freeReplyObject(redisCommand(context, "FLUSHALL"));
RUN_REDIS_TEST(context, redis_socket_test);
RUN_REDIS_TEST(context, async_redis_socket_test);
RUN_REDIS_TEST(context, logging_test);
redisFree(context);
}

Expand Down

0 comments on commit e1b8711

Please sign in to comment.