Skip to content

Commit

Permalink
[WIP] Event loop refactoring (ray-project#19)
Browse files Browse the repository at this point in the history
* task queue tests and extensions

* event loop refactor

* fix formating
  • Loading branch information
pcmoritz authored and robertnishihara committed Sep 24, 2016
1 parent e1b8711 commit 7907992
Show file tree
Hide file tree
Showing 18 changed files with 1,615 additions and 353 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae
BUILD = build

all: $(BUILD)/libcommon.a

$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o
$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o thirdparty/ae/ae.o
ar rcs $@ $^

$(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a
Expand All @@ -23,7 +23,7 @@ $(BUILD)/redis_tests: hiredis test/redis_tests.c $(BUILD)/libcommon.a logging.h
$(CC) -o $@ test/redis_tests.c logging.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

clean:
rm -f *.o state/*.o test/*.o
rm -f *.o state/*.o test/*.o thirdparty/ae/*.o
rm -rf $(BUILD)/*

redis:
Expand Down
2 changes: 2 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef COMMON_H
#define COMMON_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

Expand Down
124 changes: 44 additions & 80 deletions event_loop.c
Original file line number Diff line number Diff line change
@@ -1,98 +1,62 @@
#include "event_loop.h"

#include <assert.h>
#include "common.h"
#include <errno.h>

UT_icd item_icd = {sizeof(event_loop_item), NULL, NULL, NULL};
UT_icd poll_icd = {sizeof(struct pollfd), NULL, NULL, NULL};
#define INITIAL_EVENT_LOOP_SIZE 1024

/* Initializes the event loop.
* This function needs to be called before any other event loop function. */
void event_loop_init(event_loop *loop) {
utarray_new(loop->items, &item_icd);
utarray_new(loop->waiting, &poll_icd);
event_loop *event_loop_create() {
return aeCreateEventLoop(INITIAL_EVENT_LOOP_SIZE);
}

/* Free the space associated to the event loop.
* Does not free the event_loop datastructure itself. */
void event_loop_free(event_loop *loop) {
utarray_free(loop->items);
utarray_free(loop->waiting);
}

/* Add a new file descriptor fd to the event loop.
* This function sets a user defined type and id for the file descriptor
* which can be queried using event_loop_type and event_loop_id. The parameter
* events is the same as in http://linux.die.net/man/2/poll.
* Returns the index of the item in the event loop. */
int64_t event_loop_attach(event_loop *loop,
int type,
void *data,
int fd,
int events) {
assert(utarray_len(loop->items) == utarray_len(loop->waiting));
int64_t index = utarray_len(loop->items);
event_loop_item item = {.type = type, .data = data};
utarray_push_back(loop->items, &item);
struct pollfd waiting = {.fd = fd, .events = events};
utarray_push_back(loop->waiting, &waiting);
return index;
}

/* Detach a file descriptor from the event loop.
* This invalidates all other indices into the event loop items, but leaves
* the ids of the event loop items valid. */
void event_loop_detach(event_loop *loop, int64_t index, int shall_close) {
struct pollfd *waiting_item =
(struct pollfd *) utarray_eltptr(loop->waiting, index);
struct pollfd *waiting_back = (struct pollfd *) utarray_back(loop->waiting);
if (shall_close) {
close(waiting_item->fd);
void event_loop_destroy(event_loop *loop) {
/* Clean up timer events. This is to make valgrind happy. */
aeTimeEvent *te = loop->timeEventHead;
while (te) {
aeTimeEvent *next = te->next;
free(te);
te = next;
}
*waiting_item = *waiting_back;
utarray_pop_back(loop->waiting);

event_loop_item *items_item =
(event_loop_item *) utarray_eltptr(loop->items, index);
event_loop_item *items_back = (event_loop_item *) utarray_back(loop->items);
*items_item = *items_back;
utarray_pop_back(loop->items);
}

/* Poll the file descriptors associated to this event loop.
* See http://linux.die.net/man/2/poll. The timeout is in milliseconds. */
int event_loop_poll(event_loop *loop, int timeout) {
return poll((struct pollfd *) utarray_front(loop->waiting),
utarray_len(loop->waiting), timeout);
aeDeleteEventLoop(loop);
}

void event_loop_add_file(event_loop *loop,
int fd,
int events,
event_loop_file_handler handler,
void *context) {
/* Try to add the file descriptor. */
int err = aeCreateFileEvent(loop, fd, events, handler, context);
/* If it cannot be added, increase the size of the event loop. */
if (err == AE_ERR && errno == ERANGE) {
err = aeResizeSetSize(loop, 3 * aeGetSetSize(loop) / 2);
CHECK(err == AE_OK);
err = aeCreateFileEvent(loop, fd, events, handler, context);
}
/* In any case, test if there were errors. */
CHECK(err == AE_OK);
}

/* Get the total number of file descriptors participating in the event loop. */
int64_t event_loop_size(event_loop *loop) {
return utarray_len(loop->waiting);
void event_loop_remove_file(event_loop *loop, int fd) {
aeDeleteFileEvent(loop, fd, EVENT_LOOP_READ | EVENT_LOOP_WRITE);
}

/* Get the pollfd structure associated to a file descriptor participating in the
* event loop. */
struct pollfd *event_loop_get(event_loop *loop, int64_t index) {
return (struct pollfd *) utarray_eltptr(loop->waiting, index);
int64_t event_loop_add_timer(event_loop *loop,
int64_t milliseconds,
event_loop_timer_handler handler,
void *context) {
return aeCreateTimeEvent(loop, milliseconds, handler, context, NULL);
}

/* Set the data connection information for participant in the event loop. */
void event_loop_set_data(event_loop *loop, int64_t index, void *data) {
event_loop_item *item =
(event_loop_item *) utarray_eltptr(loop->items, index);
item->data = data;
void event_loop_remove_timer(event_loop *loop, int64_t id) {
int err = aeDeleteTimeEvent(loop, id);
CHECK(err == AE_OK); /* timer id found? */
}

/* Get the data connection information for participant in the event loop. */
void *event_loop_get_data(event_loop *loop, int64_t index) {
event_loop_item *item =
(event_loop_item *) utarray_eltptr(loop->items, index);
return item->data;
void event_loop_run(event_loop *loop) {
aeMain(loop);
}

/* Return the type of connection. */
int event_loop_type(event_loop *loop, int64_t index) {
event_loop_item *item =
(event_loop_item *) utarray_eltptr(loop->items, index);
return item->type;
void event_loop_stop(event_loop *loop) {
aeStop(loop);
}
99 changes: 67 additions & 32 deletions event_loop.h
Original file line number Diff line number Diff line change
@@ -1,39 +1,74 @@
#ifndef EVENT_LOOP_H
#define EVENT_LOOP_H

#include <poll.h>
#include <stdint.h>
#include "ae/ae.h"

#include "utarray.h"

typedef struct {
/* The type of connection (e.g. redis, client, manager, data transfer). */
int type;
/* Data associated with the connection (managed by the user) */
void *data;
} event_loop_item;

typedef struct {
/* Array of event_loop_items that hold information for connections. */
UT_array *items;
/* Array of file descriptors that are waiting, corresponding to items. */
UT_array *waiting;
} event_loop;

/* Event loop functions. */
void event_loop_init(event_loop *loop);
void event_loop_free(event_loop *loop);
int64_t event_loop_attach(event_loop *loop,
int type,
void *data,
int fd,
int events);
void event_loop_detach(event_loop *loop, int64_t index, int shall_close);
int event_loop_poll(event_loop *loop, int timeout);
int64_t event_loop_size(event_loop *loop);
struct pollfd *event_loop_get(event_loop *loop, int64_t index);
void event_loop_set_data(event_loop *loop, int64_t index, void *data);
void *event_loop_get_data(event_loop *loop, int64_t index);
int event_loop_type(event_loop *loop, int64_t index);
typedef aeEventLoop event_loop;

/* File descriptor is readable. */
#define EVENT_LOOP_READ AE_READABLE

/* File descriptor is writable. */
#define EVENT_LOOP_WRITE AE_WRITABLE

/* Signature of the handler that will be called when there is a new event
* on the file descriptor that this handler has been registered for. The
* context is the one that was passed into add_file by the user. The
* events parameter indicates which event is available on the file,
* it can be EVENT_LOOP_READ or EVENT_LOOP_WRITE. */
typedef void (*event_loop_file_handler)(event_loop *loop,
int fd,
void *context,
int events);

/* This handler will be called when a timer times out. The id of the timer
* as well as the context that was specified when registering this handler
* are passed as arguments. */
typedef int64_t (*event_loop_timer_handler)(event_loop *loop,
int64_t id,
void *context);

/* Create and return a new event loop. */
event_loop *event_loop_create();

/* Deallocate space associated with the event loop that was created
* with the "create" function. */
void event_loop_destroy(event_loop *loop);

/* Register a handler that will be called any time a new event happens on
* a file descriptor. Can specify a context that will be passed as an
* argument to the handler. Currently there can only be one handler per file.
* The events parameter specifies which events we listen to: EVENT_LOOP_READ
* or EVENT_LOOP_WRITE. */
void event_loop_add_file(event_loop *loop,
int fd,
int events,
event_loop_file_handler handler,
void *context);

/* Remove a registered file event handler from the event loop. */
void event_loop_remove_file(event_loop *loop, int fd);

/* Register a handler that will be called after a time slice of
* "milliseconds" milliseconds. Can specify a context that will be passed
* as an argument to the handler. Return the id of the time event. */
int64_t event_loop_add_timer(event_loop *loop,
int64_t milliseconds,
event_loop_timer_handler handler,
void *context);

/* Reset the timer timeout to a given number of milliseconds.
* NOTE: This is not implemented yet. */
void event_loop_reset_timer(event_loop *loop, int64_t id, int64_t milliseconds);

/* Remove a registered time event handler from the event loop. */
void event_loop_remove_timer(event_loop *loop, int64_t id);

/* Run the event loop. */
void event_loop_run(event_loop *loop);

/* Stop the event loop. */
void event_loop_stop(event_loop *loop);

#endif
9 changes: 2 additions & 7 deletions state/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@ void db_connect(const char *db_address,
int client_port,
db_conn *db);

/* Attach global system store onnection to event loop. Returns the index of the
* connection in the loop. */
int64_t db_attach(db_conn *db, event_loop *loop, int connection_type);

/* This function will be called by the user if there is a new event in the
* event loop associated with the global system store connection. */
void db_event(db_conn *db);
/* Attach global system store connection to event loop. */
void db_attach(db_conn *db, event_loop *loop);

/* Disconnect from the global system store. */
void db_disconnect(db_conn *db);
Expand Down
67 changes: 4 additions & 63 deletions state/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <assert.h>

#include <stdlib.h>
#include "hiredis/adapters/ae.h"
#include "utstring.h"

#include "common.h"
Expand All @@ -12,38 +14,6 @@
#include "redis.h"
#include "io.h"

static void poll_add_read(void *privdata) {
db_conn *conn = (db_conn *) privdata;
if (!conn->reading) {
conn->reading = 1;
event_loop_get(conn->loop, conn->db_index)->events |= POLLIN;
}
}

static void poll_del_read(void *privdata) {
db_conn *conn = (db_conn *) privdata;
if (conn->reading) {
conn->reading = 0;
event_loop_get(conn->loop, conn->db_index)->events &= ~POLLIN;
}
}

static void poll_add_write(void *privdata) {
db_conn *conn = (db_conn *) privdata;
if (!conn->writing) {
conn->writing = 1;
event_loop_get(conn->loop, conn->db_index)->events |= POLLOUT;
}
}

static void poll_del_write(void *privdata) {
db_conn *conn = (db_conn *) privdata;
if (conn->writing) {
conn->writing = 0;
event_loop_get(conn->loop, conn->db_index)->events &= ~POLLOUT;
}
}

#define LOG_REDIS_ERR(context, M, ...) \
fprintf(stderr, "[ERROR] (%s:%d: message: %s) " M "\n", __FILE__, __LINE__, \
context->errstr, ##__VA_ARGS__)
Expand Down Expand Up @@ -119,37 +89,8 @@ void db_disconnect(db_conn *db) {
free(db->client_type);
}

void db_event(db_conn *db) {
if (db->reading) {
redisAsyncHandleRead(db->context);
}
if (db->writing) {
redisAsyncHandleWrite(db->context);
}
}

int64_t db_attach(db_conn *db, event_loop *loop, int connection_type) {
db->loop = loop;

redisAsyncContext *ac = db->context;
redisContext *c = &(ac->c);

if (ac->ev.data != NULL) {
return REDIS_ERR;
}

ac->ev.addRead = poll_add_read;
ac->ev.delRead = poll_del_read;
ac->ev.addWrite = poll_add_write;
ac->ev.delWrite = poll_del_write;
// TODO(pcm): Implement cleanup function

ac->ev.data = db;

int64_t index =
event_loop_attach(loop, connection_type, NULL, c->fd, POLLIN | POLLOUT);
db->db_index = index;
return index;
void db_attach(db_conn *db, event_loop *loop) {
redisAeAttach(loop, db->context);
}

void object_table_add(db_conn *db, unique_id object_id) {
Expand Down
2 changes: 1 addition & 1 deletion state/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ typedef struct {
/* Unique ID for this service. */
int service_id;
/* IP address and port of this service. */
const char *addr;
char *addr;
/* Handle for the uthash table. */
UT_hash_handle hh;
} service_cache_entry;
Expand Down
Loading

0 comments on commit 7907992

Please sign in to comment.