Skip to content

Commit

Permalink
API for task log and scheduled task (ray-project#25)
Browse files Browse the repository at this point in the history
* API revision

* update

* make status a bitmap

* update api

* tests working

* new task log APIs

* update APIs

* write binary structures to redis

* update tests

* fix clang-format

* Fix formatting.
  • Loading branch information
pcmoritz authored and robertnishihara committed Sep 30, 2016
1 parent 084220b commit e21e9f6
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 214 deletions.
63 changes: 3 additions & 60 deletions common.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include <sys/stat.h>
#include <fcntl.h>

const unique_id NIL_ID = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255}};

unique_id globally_unique_id(void) {
/* Use /dev/urandom for "real" randomness. */
int fd;
Expand All @@ -31,63 +34,3 @@ char *sha1_to_hex(const unsigned char *sha1, char *buffer) {

return buffer;
}

const signed char hexval_table[256] = {
-1, -1, -1, -1, -1, -1, -1, -1, /* 00-07 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 08-0f */
-1, -1, -1, -1, -1, -1, -1, -1, /* 10-17 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 18-1f */
-1, -1, -1, -1, -1, -1, -1, -1, /* 20-27 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 28-2f */
+0, +1, +2, +3, +4, +5, +6, +7, /* 30-37 */
+8, +9, -1, -1, -1, -1, -1, -1, /* 38-3f */
-1, 10, 11, 12, 13, 14, 15, -1, /* 40-47 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 48-4f */
-1, -1, -1, -1, -1, -1, -1, -1, /* 50-57 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 58-5f */
-1, 10, 11, 12, 13, 14, 15, -1, /* 60-67 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 68-67 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 70-77 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 78-7f */
-1, -1, -1, -1, -1, -1, -1, -1, /* 80-87 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 88-8f */
-1, -1, -1, -1, -1, -1, -1, -1, /* 90-97 */
-1, -1, -1, -1, -1, -1, -1, -1, /* 98-9f */
-1, -1, -1, -1, -1, -1, -1, -1, /* a0-a7 */
-1, -1, -1, -1, -1, -1, -1, -1, /* a8-af */
-1, -1, -1, -1, -1, -1, -1, -1, /* b0-b7 */
-1, -1, -1, -1, -1, -1, -1, -1, /* b8-bf */
-1, -1, -1, -1, -1, -1, -1, -1, /* c0-c7 */
-1, -1, -1, -1, -1, -1, -1, -1, /* c8-cf */
-1, -1, -1, -1, -1, -1, -1, -1, /* d0-d7 */
-1, -1, -1, -1, -1, -1, -1, -1, /* d8-df */
-1, -1, -1, -1, -1, -1, -1, -1, /* e0-e7 */
-1, -1, -1, -1, -1, -1, -1, -1, /* e8-ef */
-1, -1, -1, -1, -1, -1, -1, -1, /* f0-f7 */
-1, -1, -1, -1, -1, -1, -1, -1, /* f8-ff */
};

static inline unsigned int hexval(unsigned char c) {
return hexval_table[c];
}

/*
* Convert two consecutive hexadecimal digits into a char. Return a
* negative value on error. Don't run over the end of short strings.
*/
static inline int hex2chr(const char *s) {
int val = hexval(s[0]);
return (val < 0) ? val : (val << 4) | hexval(s[1]);
}

int hex_to_sha1(const char *hex, unsigned char *sha1) {
int i;
for (i = 0; i < UNIQUE_ID_SIZE; i++) {
int val = hex2chr(hex);
if (val < 0)
return -1;
*sha1++ = val;
hex += 2;
}
return 0;
}
7 changes: 2 additions & 5 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id;

extern const unique_id NIL_ID;

/* Generate a globally unique ID. */
unique_id globally_unique_id(void);

Expand All @@ -46,11 +48,6 @@ unique_id globally_unique_id(void);
* UNIQUE_ID_SIZE + 1 */
char *sha1_to_hex(const unsigned char *sha1, char *buffer);

/* Convert a hexdecimal string of length 40 to a 20 byte sha1 hash. This
* function assumes that sha1 points to an already allocated char array of size
* UNIQUE_ID_SIZE. */
int hex_to_sha1(const char *hex, unsigned char *sha1);

typedef unique_id object_id;

#endif
99 changes: 77 additions & 22 deletions state/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "common.h"
#include "db.h"
#include "object_table.h"
#include "task_queue.h"
#include "task_log.h"
#include "event_loop.h"
#include "redis.h"
#include "io.h"
Expand Down Expand Up @@ -65,41 +65,51 @@ db_handle *db_connect(const char *address,

db->client_type = strdup(client_type);
db->client_id = num_clients;
db->reading = 0;
db->writing = 0;
db->service_cache = NULL;
db->sync_context = context;
utarray_new(db->callback_freelist, &ut_ptr_icd);

/* Establish async connection */
db->context = redisAsyncConnect(address, port);
CHECK_REDIS_CONNECT(redisAsyncContext, db->context,
"could not connect to redis %s:%d", address, port);
db->context->data = (void *) db;
/* Establish async connection for subscription */
db->sub_context = redisAsyncConnect(address, port);
CHECK_REDIS_CONNECT(redisAsyncContext, db->sub_context,
"could not connect to redis %s:%d", address, port);
db->sub_context->data = (void *) db;

return db;
}

void db_disconnect(db_handle *db) {
redisFree(db->sync_context);
redisAsyncFree(db->context);
redisAsyncFree(db->sub_context);
service_cache_entry *e, *tmp;
HASH_ITER(hh, db->service_cache, e, tmp) {
free(e->addr);
HASH_DEL(db->service_cache, e);
free(e);
}
free(db->client_type);
void **p = NULL;
while ((p = (void **) utarray_next(db->callback_freelist, p))) {
free(*p);
}
utarray_free(db->callback_freelist);
free(db);
}

void db_attach(db_handle *db, event_loop *loop) {
redisAeAttach(loop, db->context);
redisAeAttach(loop, db->sub_context);
}

void object_table_add(db_handle *db, unique_id object_id) {
static char hex_object_id[2 * UNIQUE_ID_SIZE + 1];
sha1_to_hex(&object_id.id[0], &hex_object_id[0]);
redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%s %d",
&hex_object_id[0], db->client_id);
redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%b %d", &object_id.id[0],
UNIQUE_ID_SIZE, db->client_id);
if (db->context->err) {
LOG_REDIS_ERR(db->context, "could not add object_table entry");
}
Expand Down Expand Up @@ -148,30 +158,75 @@ void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) {
void object_table_lookup(db_handle *db,
object_id object_id,
lookup_callback callback) {
static char hex_object_id[2 * UNIQUE_ID_SIZE + 1];
sha1_to_hex(&object_id.id[0], &hex_object_id[0]);
lookup_callback_data *cb_data = malloc(sizeof(lookup_callback_data));
cb_data->callback = callback;
cb_data->object_id = object_id;
redisAsyncCommand(db->context, object_table_get_entry, cb_data,
"SMEMBERS obj:%s", &hex_object_id[0]);
"SMEMBERS obj:%b", &object_id.id[0], UNIQUE_ID_SIZE);
if (db->context->err) {
LOG_REDIS_ERR(db->context, "error in object_table lookup");
}
}

void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task) {
/* For converting an id to hex, which has double the number
* of bytes compared to the id (+ 1 byte for '\0'). */
static char hex[2 * UNIQUE_ID_SIZE + 1];
UT_string *command;
utstring_new(command);
sha1_to_hex(&task_iid.id[0], &hex[0]);
utstring_printf(command, "HMSET queue:%s ", &hex[0]);
print_task(task, command);
redisAsyncCommand(db->context, NULL, NULL, utstring_body(command));
void task_log_add_task(db_handle *db, task_instance *task_instance) {
task_iid task_iid = *task_instance_id(task_instance);
redisAsyncCommand(db->context, NULL, NULL, "HMSET tasklog:%b 0 %b",
(char *) &task_iid.id[0], UNIQUE_ID_SIZE,
(char *) task_instance, task_instance_size(task_instance));
if (db->context->err) {
LOG_REDIS_ERR(db->context, "error setting task in task_log_add_task");
}
node_id node = *task_instance_node(task_instance);
int32_t state = *task_instance_state(task_instance);
redisAsyncCommand(db->context, NULL, NULL, "PUBLISH task_log:%b:%d %b",
(char *) &node.id[0], UNIQUE_ID_SIZE, state,
(char *) task_instance, task_instance_size(task_instance));
if (db->context->err) {
LOG_REDIS_ERR(db->context, "error in task_queue submit_task");
LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task");
}
}

void task_log_redis_callback(redisAsyncContext *c,
void *reply,
void *privdata) {
redisReply *r = reply;
if (reply == NULL)
return;
CHECK(r->type == REDIS_REPLY_ARRAY);
/* First entry is message type, second is topic, third is payload. */
CHECK(r->elements > 2);
/* If this condition is true, we got the initial message that acknowledged the
* subscription. */
if (r->element[2]->str == NULL) {
return;
}
/* Otherwise, parse the task and call the callback. */
CHECK(privdata);
task_log_callback_data *callback_data = privdata;
task_instance *instance = malloc(r->element[2]->len);
memcpy(instance, r->element[2]->str, r->element[2]->len);
callback_data->callback(instance, callback_data->userdata);
task_instance_free(instance);
}
void task_log_register_callback(db_handle *db,
task_log_callback callback,
node_id node,
int32_t state,
void *userdata) {
task_log_callback_data *callback_data =
malloc(sizeof(task_log_callback_data));
utarray_push_back(db->callback_freelist, &callback_data);
callback_data->callback = callback;
callback_data->userdata = userdata;
if (memcmp(&node.id[0], &NIL_ID.id[0], UNIQUE_ID_SIZE) == 0) {
redisAsyncCommand(db->sub_context, task_log_redis_callback, callback_data,
"PSUBSCRIBE task_log:*:%d", state);
} else {
redisAsyncCommand(db->sub_context, task_log_redis_callback, callback_data,
"SUBSCRIBE task_log:%b:%d", (char *) &node.id[0],
UNIQUE_ID_SIZE, state);
}
if (db->sub_context->err) {
LOG_REDIS_ERR(db->sub_context, "error in task_log_register_callback");
}
utstring_free(command);
}
17 changes: 15 additions & 2 deletions state/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

#include "db.h"
#include "object_table.h"
#include "task_log.h"

#include "hiredis/hiredis.h"
#include "hiredis/async.h"
#include "uthash.h"
#include "utarray.h"

typedef struct {
/* Unique ID for this service. */
Expand All @@ -17,15 +19,24 @@ typedef struct {
UT_hash_handle hh;
} service_cache_entry;

typedef struct {
/* The callback that will be called. */
task_log_callback callback;
/* Userdata associated with the callback. */
void *userdata;
} task_log_callback_data;

struct db_handle_impl {
/* String that identifies this client type. */
char *client_type;
/* Unique ID for this client within the type. */
int64_t client_id;
/* Redis context for this global state store connection. */
redisAsyncContext *context;
/* Which events are we processing (read, write)? */
int reading, writing;
/* Redis context for "subscribe" communication.
* Yes, we need a separate one for that, see
* https://github.com/redis/hiredis/issues/55 */
redisAsyncContext *sub_context;
/* The event loop this global state store connection is part of. */
event_loop *loop;
/* Index of the database connection in the event loop */
Expand All @@ -35,6 +46,8 @@ struct db_handle_impl {
/* Redis context for synchronous connections.
* Should only be used very rarely, it is not asynchronous. */
redisContext *sync_context;
/* Data structure for callbacks that needs to be freed. */
UT_array *callback_freelist;
};

typedef struct {
Expand Down
41 changes: 41 additions & 0 deletions state/task_log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef TASK_LOG_H
#define TASK_LOG_H

#include "db.h"
#include "task.h"

/* The task log is a message bus that is used for all communication between
* local and global schedulers (and also persisted to the state database).
* Here are examples of events that are recorded by the task log:
*
* 1) local scheduler writes it when submits a task to the global scheduler;
* 2) global scheduler reads it to get the task submitted by local schedulers;
* 3) global scheduler writes it when assigning the task to a local scheduler;
* 4) local scheduler reads it to get its tasks assigned by global scheduler;
* 5) local scheduler writes it when a task finishes execution;
* 6) global scheduler reads it to get the tasks that have finished; */

/* Callback for subscribing to the task log. */
typedef void (*task_log_callback)(task_instance *task_instance, void *userdata);

/* Initially add a task instance to the task log. */
void task_log_add_task(db_handle *db, task_instance *task_instance);

/* Update task instance in the task log. */
void task_log_update_task(db_handle *db,
task_iid task_iid,
int32_t state,
node_id node);

/* Register callback for a certain event. The node specifies the node whose
* events we want to listen to. If you want to listen to all events for this
* node, use state_filter =
* TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE.
* If you want to register to updates from all nodes, set node = NIL_ID. */
void task_log_register_callback(db_handle *db,
task_log_callback callback,
node_id node,
int32_t state_filter,
void *userdata);

#endif /* TASK_LOG_H */
33 changes: 0 additions & 33 deletions state/task_queue.h

This file was deleted.

11 changes: 9 additions & 2 deletions state/task_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@
#include "task.h"

/* Add task to the task table, handle errors here. */
status task_table_add_task(db_handle *db, task_iid task_iid, task_spec *task);
status task_table_add_task(db_handle *db, task_spec *task);

/* Callback for getting an entry from the task table. Task spec will be freed
* by the system after the callback */
typedef void (*task_table_callback)(task_spec *task, void *context);

/* Get specific task from the task table. */
status task_table_get_task(db_handle *db, task_iid task_iid, task_spec *task);
status task_table_get_task(db_handle *db,
task_id task_id,
task_table_callback callback,
void *context);

#endif /* TASK_TABLE_H */
Loading

0 comments on commit e21e9f6

Please sign in to comment.