diff --git a/Makefile b/Makefile index aa5ee292..f7127311 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ CC = gcc -CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty +CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae BUILD = build all: $(BUILD)/libcommon.a -$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o +$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o thirdparty/ae/ae.o ar rcs $@ $^ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a @@ -23,7 +23,7 @@ $(BUILD)/redis_tests: hiredis test/redis_tests.c $(BUILD)/libcommon.a logging.h $(CC) -o $@ test/redis_tests.c logging.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) clean: - rm -f *.o state/*.o test/*.o + rm -f *.o state/*.o test/*.o thirdparty/ae/*.o rm -rf $(BUILD)/* redis: diff --git a/common.h b/common.h index 7e4b73eb..5444739b 100644 --- a/common.h +++ b/common.h @@ -1,6 +1,8 @@ #ifndef COMMON_H #define COMMON_H +#include +#include #include #include diff --git a/event_loop.c b/event_loop.c index d89710ba..6928705e 100644 --- a/event_loop.c +++ b/event_loop.c @@ -1,98 +1,62 @@ #include "event_loop.h" -#include +#include "common.h" +#include -UT_icd item_icd = {sizeof(event_loop_item), NULL, NULL, NULL}; -UT_icd poll_icd = {sizeof(struct pollfd), NULL, NULL, NULL}; +#define INITIAL_EVENT_LOOP_SIZE 1024 -/* Initializes the event loop. - * This function needs to be called before any other event loop function. */ -void event_loop_init(event_loop *loop) { - utarray_new(loop->items, &item_icd); - utarray_new(loop->waiting, &poll_icd); +event_loop *event_loop_create() { + return aeCreateEventLoop(INITIAL_EVENT_LOOP_SIZE); } -/* Free the space associated to the event loop. - * Does not free the event_loop datastructure itself. */ -void event_loop_free(event_loop *loop) { - utarray_free(loop->items); - utarray_free(loop->waiting); -} - -/* Add a new file descriptor fd to the event loop. - * This function sets a user defined type and id for the file descriptor - * which can be queried using event_loop_type and event_loop_id. The parameter - * events is the same as in http://linux.die.net/man/2/poll. - * Returns the index of the item in the event loop. */ -int64_t event_loop_attach(event_loop *loop, - int type, - void *data, - int fd, - int events) { - assert(utarray_len(loop->items) == utarray_len(loop->waiting)); - int64_t index = utarray_len(loop->items); - event_loop_item item = {.type = type, .data = data}; - utarray_push_back(loop->items, &item); - struct pollfd waiting = {.fd = fd, .events = events}; - utarray_push_back(loop->waiting, &waiting); - return index; -} - -/* Detach a file descriptor from the event loop. - * This invalidates all other indices into the event loop items, but leaves - * the ids of the event loop items valid. */ -void event_loop_detach(event_loop *loop, int64_t index, int shall_close) { - struct pollfd *waiting_item = - (struct pollfd *) utarray_eltptr(loop->waiting, index); - struct pollfd *waiting_back = (struct pollfd *) utarray_back(loop->waiting); - if (shall_close) { - close(waiting_item->fd); +void event_loop_destroy(event_loop *loop) { + /* Clean up timer events. This is to make valgrind happy. */ + aeTimeEvent *te = loop->timeEventHead; + while (te) { + aeTimeEvent *next = te->next; + free(te); + te = next; } - *waiting_item = *waiting_back; - utarray_pop_back(loop->waiting); - - event_loop_item *items_item = - (event_loop_item *) utarray_eltptr(loop->items, index); - event_loop_item *items_back = (event_loop_item *) utarray_back(loop->items); - *items_item = *items_back; - utarray_pop_back(loop->items); -} - -/* Poll the file descriptors associated to this event loop. - * See http://linux.die.net/man/2/poll. The timeout is in milliseconds. */ -int event_loop_poll(event_loop *loop, int timeout) { - return poll((struct pollfd *) utarray_front(loop->waiting), - utarray_len(loop->waiting), timeout); + aeDeleteEventLoop(loop); +} + +void event_loop_add_file(event_loop *loop, + int fd, + int events, + event_loop_file_handler handler, + void *context) { + /* Try to add the file descriptor. */ + int err = aeCreateFileEvent(loop, fd, events, handler, context); + /* If it cannot be added, increase the size of the event loop. */ + if (err == AE_ERR && errno == ERANGE) { + err = aeResizeSetSize(loop, 3 * aeGetSetSize(loop) / 2); + CHECK(err == AE_OK); + err = aeCreateFileEvent(loop, fd, events, handler, context); + } + /* In any case, test if there were errors. */ + CHECK(err == AE_OK); } -/* Get the total number of file descriptors participating in the event loop. */ -int64_t event_loop_size(event_loop *loop) { - return utarray_len(loop->waiting); +void event_loop_remove_file(event_loop *loop, int fd) { + aeDeleteFileEvent(loop, fd, EVENT_LOOP_READ | EVENT_LOOP_WRITE); } -/* Get the pollfd structure associated to a file descriptor participating in the - * event loop. */ -struct pollfd *event_loop_get(event_loop *loop, int64_t index) { - return (struct pollfd *) utarray_eltptr(loop->waiting, index); +int64_t event_loop_add_timer(event_loop *loop, + int64_t milliseconds, + event_loop_timer_handler handler, + void *context) { + return aeCreateTimeEvent(loop, milliseconds, handler, context, NULL); } -/* Set the data connection information for participant in the event loop. */ -void event_loop_set_data(event_loop *loop, int64_t index, void *data) { - event_loop_item *item = - (event_loop_item *) utarray_eltptr(loop->items, index); - item->data = data; +void event_loop_remove_timer(event_loop *loop, int64_t id) { + int err = aeDeleteTimeEvent(loop, id); + CHECK(err == AE_OK); /* timer id found? */ } -/* Get the data connection information for participant in the event loop. */ -void *event_loop_get_data(event_loop *loop, int64_t index) { - event_loop_item *item = - (event_loop_item *) utarray_eltptr(loop->items, index); - return item->data; +void event_loop_run(event_loop *loop) { + aeMain(loop); } -/* Return the type of connection. */ -int event_loop_type(event_loop *loop, int64_t index) { - event_loop_item *item = - (event_loop_item *) utarray_eltptr(loop->items, index); - return item->type; +void event_loop_stop(event_loop *loop) { + aeStop(loop); } diff --git a/event_loop.h b/event_loop.h index 840abfd8..bb6afdb9 100644 --- a/event_loop.h +++ b/event_loop.h @@ -1,39 +1,74 @@ #ifndef EVENT_LOOP_H #define EVENT_LOOP_H -#include #include +#include "ae/ae.h" -#include "utarray.h" - -typedef struct { - /* The type of connection (e.g. redis, client, manager, data transfer). */ - int type; - /* Data associated with the connection (managed by the user) */ - void *data; -} event_loop_item; - -typedef struct { - /* Array of event_loop_items that hold information for connections. */ - UT_array *items; - /* Array of file descriptors that are waiting, corresponding to items. */ - UT_array *waiting; -} event_loop; - -/* Event loop functions. */ -void event_loop_init(event_loop *loop); -void event_loop_free(event_loop *loop); -int64_t event_loop_attach(event_loop *loop, - int type, - void *data, - int fd, - int events); -void event_loop_detach(event_loop *loop, int64_t index, int shall_close); -int event_loop_poll(event_loop *loop, int timeout); -int64_t event_loop_size(event_loop *loop); -struct pollfd *event_loop_get(event_loop *loop, int64_t index); -void event_loop_set_data(event_loop *loop, int64_t index, void *data); -void *event_loop_get_data(event_loop *loop, int64_t index); -int event_loop_type(event_loop *loop, int64_t index); +typedef aeEventLoop event_loop; + +/* File descriptor is readable. */ +#define EVENT_LOOP_READ AE_READABLE + +/* File descriptor is writable. */ +#define EVENT_LOOP_WRITE AE_WRITABLE + +/* Signature of the handler that will be called when there is a new event + * on the file descriptor that this handler has been registered for. The + * context is the one that was passed into add_file by the user. The + * events parameter indicates which event is available on the file, + * it can be EVENT_LOOP_READ or EVENT_LOOP_WRITE. */ +typedef void (*event_loop_file_handler)(event_loop *loop, + int fd, + void *context, + int events); + +/* This handler will be called when a timer times out. The id of the timer + * as well as the context that was specified when registering this handler + * are passed as arguments. */ +typedef int64_t (*event_loop_timer_handler)(event_loop *loop, + int64_t id, + void *context); + +/* Create and return a new event loop. */ +event_loop *event_loop_create(); + +/* Deallocate space associated with the event loop that was created + * with the "create" function. */ +void event_loop_destroy(event_loop *loop); + +/* Register a handler that will be called any time a new event happens on + * a file descriptor. Can specify a context that will be passed as an + * argument to the handler. Currently there can only be one handler per file. + * The events parameter specifies which events we listen to: EVENT_LOOP_READ + * or EVENT_LOOP_WRITE. */ +void event_loop_add_file(event_loop *loop, + int fd, + int events, + event_loop_file_handler handler, + void *context); + +/* Remove a registered file event handler from the event loop. */ +void event_loop_remove_file(event_loop *loop, int fd); + +/* Register a handler that will be called after a time slice of + * "milliseconds" milliseconds. Can specify a context that will be passed + * as an argument to the handler. Return the id of the time event. */ +int64_t event_loop_add_timer(event_loop *loop, + int64_t milliseconds, + event_loop_timer_handler handler, + void *context); + +/* Reset the timer timeout to a given number of milliseconds. + * NOTE: This is not implemented yet. */ +void event_loop_reset_timer(event_loop *loop, int64_t id, int64_t milliseconds); + +/* Remove a registered time event handler from the event loop. */ +void event_loop_remove_timer(event_loop *loop, int64_t id); + +/* Run the event loop. */ +void event_loop_run(event_loop *loop); + +/* Stop the event loop. */ +void event_loop_stop(event_loop *loop); #endif diff --git a/state/db.h b/state/db.h index b586f9ac..3fcf658f 100644 --- a/state/db.h +++ b/state/db.h @@ -15,13 +15,8 @@ void db_connect(const char *db_address, int client_port, db_conn *db); -/* Attach global system store onnection to event loop. Returns the index of the - * connection in the loop. */ -int64_t db_attach(db_conn *db, event_loop *loop, int connection_type); - -/* This function will be called by the user if there is a new event in the - * event loop associated with the global system store connection. */ -void db_event(db_conn *db); +/* Attach global system store connection to event loop. */ +void db_attach(db_conn *db, event_loop *loop); /* Disconnect from the global system store. */ void db_disconnect(db_conn *db); diff --git a/state/redis.c b/state/redis.c index ac56206e..9c4b81f0 100644 --- a/state/redis.c +++ b/state/redis.c @@ -2,6 +2,8 @@ #include +#include +#include "hiredis/adapters/ae.h" #include "utstring.h" #include "common.h" @@ -12,38 +14,6 @@ #include "redis.h" #include "io.h" -static void poll_add_read(void *privdata) { - db_conn *conn = (db_conn *) privdata; - if (!conn->reading) { - conn->reading = 1; - event_loop_get(conn->loop, conn->db_index)->events |= POLLIN; - } -} - -static void poll_del_read(void *privdata) { - db_conn *conn = (db_conn *) privdata; - if (conn->reading) { - conn->reading = 0; - event_loop_get(conn->loop, conn->db_index)->events &= ~POLLIN; - } -} - -static void poll_add_write(void *privdata) { - db_conn *conn = (db_conn *) privdata; - if (!conn->writing) { - conn->writing = 1; - event_loop_get(conn->loop, conn->db_index)->events |= POLLOUT; - } -} - -static void poll_del_write(void *privdata) { - db_conn *conn = (db_conn *) privdata; - if (conn->writing) { - conn->writing = 0; - event_loop_get(conn->loop, conn->db_index)->events &= ~POLLOUT; - } -} - #define LOG_REDIS_ERR(context, M, ...) \ fprintf(stderr, "[ERROR] (%s:%d: message: %s) " M "\n", __FILE__, __LINE__, \ context->errstr, ##__VA_ARGS__) @@ -119,37 +89,8 @@ void db_disconnect(db_conn *db) { free(db->client_type); } -void db_event(db_conn *db) { - if (db->reading) { - redisAsyncHandleRead(db->context); - } - if (db->writing) { - redisAsyncHandleWrite(db->context); - } -} - -int64_t db_attach(db_conn *db, event_loop *loop, int connection_type) { - db->loop = loop; - - redisAsyncContext *ac = db->context; - redisContext *c = &(ac->c); - - if (ac->ev.data != NULL) { - return REDIS_ERR; - } - - ac->ev.addRead = poll_add_read; - ac->ev.delRead = poll_del_read; - ac->ev.addWrite = poll_add_write; - ac->ev.delWrite = poll_del_write; - // TODO(pcm): Implement cleanup function - - ac->ev.data = db; - - int64_t index = - event_loop_attach(loop, connection_type, NULL, c->fd, POLLIN | POLLOUT); - db->db_index = index; - return index; +void db_attach(db_conn *db, event_loop *loop) { + redisAeAttach(loop, db->context); } void object_table_add(db_conn *db, unique_id object_id) { diff --git a/state/redis.h b/state/redis.h index 09d66bea..c579e706 100644 --- a/state/redis.h +++ b/state/redis.h @@ -12,7 +12,7 @@ typedef struct { /* Unique ID for this service. */ int service_id; /* IP address and port of this service. */ - const char *addr; + char *addr; /* Handle for the uthash table. */ UT_hash_handle hh; } service_cache_entry; diff --git a/test/db_tests.c b/test/db_tests.c index 5bdbf25d..d9dfcb56 100644 --- a/test/db_tests.c +++ b/test/db_tests.c @@ -6,12 +6,12 @@ #include "test/example_task.h" #include "state/db.h" #include "state/object_table.h" +#include "state/task_queue.h" #include "state/redis.h" #include "task.h" SUITE(db_tests); -int lookup_successful = 0; const char *manager_addr = "127.0.0.1"; int manager_port1 = 12345; int manager_port2 = 12346; @@ -20,20 +20,11 @@ char received_port1[6] = {0}; char received_addr2[16] = {0}; char received_port2[6] = {0}; -/* This is for synchronizing to make sure both entries have been written. */ -void sync_test_callback(object_id object_id, - int manager_count, - const char *manager_vector[]) { - lookup_successful = 1; - free(manager_vector); -} - -/* This performs the actual test. */ +/* Test if entries have been written to the database. */ void test_callback(object_id object_id, int manager_count, const char *manager_vector[]) { CHECK(manager_count == 2); - lookup_successful = 1; if (!manager_vector[0] || sscanf(manager_vector[0], "%15[0-9.]:%5[0-9]", received_addr1, received_port1) != 2) { @@ -47,57 +38,29 @@ void test_callback(object_id object_id, free(manager_vector); } +int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { + event_loop_stop(loop); + return -1; +} + TEST object_table_lookup_test(void) { - event_loop loop; - event_loop_init(&loop); + event_loop *loop = event_loop_create(); db_conn conn1; db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, manager_port1, &conn1); db_conn conn2; db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, manager_port2, &conn2); - int64_t index1 = db_attach(&conn1, &loop, 0); - int64_t index2 = db_attach(&conn2, &loop, 1); + db_attach(&conn1, loop); + db_attach(&conn2, loop); unique_id id = globally_unique_id(); object_table_add(&conn1, id); object_table_add(&conn2, id); - object_table_lookup(&conn1, id, sync_test_callback); - while (!lookup_successful) { - int num_ready = event_loop_poll(&loop, -1); - if (num_ready < 0) { - exit(-1); - } - for (int i = 0; i < event_loop_size(&loop); ++i) { - struct pollfd *waiting = event_loop_get(&loop, i); - if (waiting->revents == 0) - continue; - if (i == index1) { - db_event(&conn1); - } - if (i == index2) { - db_event(&conn2); - } - } - } - lookup_successful = 0; + event_loop_add_timer(loop, 100, timeout_handler, NULL); + event_loop_run(loop); object_table_lookup(&conn1, id, test_callback); - while (!lookup_successful) { - int num_ready = event_loop_poll(&loop, -1); - if (num_ready < 0) { - exit(-1); - } - for (int i = 0; i < event_loop_size(&loop); ++i) { - struct pollfd *waiting = event_loop_get(&loop, i); - if (waiting->revents == 0) - continue; - if (i == index1) { - db_event(&conn1); - } - if (i == index2) { - db_event(&conn2); - } - } - } + event_loop_add_timer(loop, 100, timeout_handler, NULL); + event_loop_run(loop); int port1 = atoi(received_port1); int port2 = atoi(received_port2); ASSERT_STR_EQ(&received_addr1[0], manager_addr); @@ -107,50 +70,32 @@ TEST object_table_lookup_test(void) { db_disconnect(&conn1); db_disconnect(&conn2); - event_loop_free(&loop); - - lookup_successful = 0; + event_loop_destroy(loop); PASS(); } TEST task_queue_test(void) { - event_loop loop; - event_loop_init(&loop); + event_loop *loop = event_loop_create(); db_conn conn; db_connect("127.0.0.1", 6379, "local_scheduler", "", -1, &conn); - int64_t index = db_attach(&conn, &loop, 0); + db_attach(&conn, loop); task_spec *task = example_task(); task_queue_submit_task(&conn, globally_unique_id(), task); - while (1) { - int num_ready = event_loop_poll(&loop, 100); - if (num_ready < 0) { - exit(-1); - } - if (num_ready == 0) { - break; - } - for (int i = 0; i < event_loop_size(&loop); ++i) { - struct pollfd *waiting = event_loop_get(&loop, i); - if (waiting->revents == 0) - continue; - if (i == index) { - db_event(&conn); - } - } - } + event_loop_add_timer(loop, 100, timeout_handler, NULL); + event_loop_run(loop); free_task_spec(task); db_disconnect(&conn); - event_loop_free(&loop); + event_loop_destroy(loop); PASS(); } SUITE(db_tests) { redisContext *context = redisConnect("127.0.0.1", 6379); - redisCommand(context, "FLUSHALL"); + freeReplyObject(redisCommand(context, "FLUSHALL")); RUN_REDIS_TEST(context, object_table_lookup_test); - RUN_TEST(task_queue_test); + RUN_REDIS_TEST(context, task_queue_test); redisFree(context); } diff --git a/test/io_tests.c b/test/io_tests.c index 9216aa56..1e8d2083 100644 --- a/test/io_tests.c +++ b/test/io_tests.c @@ -35,6 +35,7 @@ TEST ipc_socket_test(void) { uint8_t *bytes; read_bytes(client_fd, &bytes, &len); ASSERT(memcmp(test_bytes, bytes, len) == 0); + free(bytes); close(client_fd); close(socket_fd); unlink(socket_pathname); diff --git a/test/redis_tests.c b/test/redis_tests.c index d8df538e..d527d047 100644 --- a/test/redis_tests.c +++ b/test/redis_tests.c @@ -3,6 +3,8 @@ #include #include +#include "utarray.h" + #include "event_loop.h" #include "state/db.h" #include "state/redis.h" @@ -11,35 +13,27 @@ SUITE(redis_tests); -int lookup_successful = 0; const char *test_set_format = "SET %s %s"; const char *test_get_format = "GET %s"; const char *test_key = "foo"; const char *test_value = "bar"; +UT_array *connections = NULL; + +int async_redis_socket_test_callback_called = 0; void async_redis_socket_test_callback(redisAsyncContext *ac, void *r, void *privdata) { + async_redis_socket_test_callback_called = 1; redisContext *context = redisConnect("127.0.0.1", 6379); redisReply *reply = redisCommand(context, test_get_format, test_key); redisFree(context); - assert(reply != NULL); + CHECK(reply != NULL); if (strcmp(reply->str, test_value)) { freeReplyObject(reply); - assert(0); + CHECK(0); } freeReplyObject(reply); - lookup_successful = 1; -} - -void logging_test_callback(redisAsyncContext *ac, void *r, void *privdata) { - redisContext *context = redisConnect("127.0.0.1", 6379); - redisReply *reply = redisCommand(context, "KEYS %s", "log:*"); - redisFree(context); - assert(reply != NULL); - assert(reply->elements > 0); - freeReplyObject(reply); - lookup_successful = 1; } TEST redis_socket_test(void) { @@ -73,117 +67,145 @@ TEST redis_socket_test(void) { PASS(); } +void redis_read_callback(event_loop *loop, int fd, void *context, int events) { + db_conn *conn = context; + char *cmd = read_string(fd); + redisAsyncCommand(conn->context, async_redis_socket_test_callback, NULL, cmd, + conn->client_id, 0); + free(cmd); +} + +void redis_accept_callback(event_loop *loop, + int socket_fd, + void *context, + int events) { + int accept_fd = accept_client(socket_fd); + CHECK(accept_fd >= 0); + utarray_push_back(connections, &accept_fd); + event_loop_add_file(loop, accept_fd, EVENT_LOOP_READ, redis_read_callback, + context); +} + +int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { + event_loop_stop(loop); + return -1; +} + TEST async_redis_socket_test(void) { - int socket_fd, server_fd, client_fd; - event_loop loop; - event_loop_init(&loop); + utarray_new(connections, &ut_int_icd); + event_loop *loop = event_loop_create(); + /* Start IPC channel. */ const char *socket_pathname = "async-redis-test-socket"; - socket_fd = bind_ipc_sock(socket_pathname); + int socket_fd = bind_ipc_sock(socket_pathname); ASSERT(socket_fd >= 0); - int64_t ipc_index = event_loop_attach(&loop, 1, NULL, socket_fd, POLLIN); + utarray_push_back(connections, &socket_fd); /* Start connection to Redis. */ db_conn conn; db_connect("127.0.0.1", 6379, "", "", 0, &conn); - int64_t db_index = db_attach(&conn, &loop, 0); + db_attach(&conn, loop); /* Send a command to the Redis process. */ - client_fd = connect_ipc_sock(socket_pathname); + int client_fd = connect_ipc_sock(socket_pathname); ASSERT(client_fd >= 0); + utarray_push_back(connections, &client_fd); write_formatted_string(client_fd, test_set_format, test_key, test_value); - while (!lookup_successful) { - int num_ready = event_loop_poll(&loop, -1); - if (num_ready < 0) { - exit(-1); - } - for (int i = 0; i < event_loop_size(&loop); ++i) { - struct pollfd *waiting = event_loop_get(&loop, i); - if (waiting->revents == 0) - continue; - if (i == db_index) { - db_event(&conn); - } else if (i == ipc_index) { - /* For some reason, this check is necessary for Travis - * to pass these tests. */ - ASSERT(waiting->revents & POLLIN); - server_fd = accept_client(socket_fd); - ASSERT(server_fd >= 0); - event_loop_attach(&loop, 1, NULL, server_fd, POLLIN); - } else { - char *cmd = read_string(waiting->fd); - redisAsyncCommand(conn.context, async_redis_socket_test_callback, NULL, - cmd, conn.client_id, 0); - free(cmd); - } - } - } + event_loop_add_file(loop, client_fd, EVENT_LOOP_READ, redis_read_callback, + &conn); + event_loop_add_file(loop, socket_fd, EVENT_LOOP_READ, redis_accept_callback, + &conn); + event_loop_add_timer(loop, 100, timeout_handler, NULL); + event_loop_run(loop); + + CHECK(async_redis_socket_test_callback_called); + db_disconnect(&conn); - event_loop_free(&loop); - close(server_fd); - close(client_fd); - close(socket_fd); + event_loop_destroy(loop); + for (int *p = (int *) utarray_front(connections); p != NULL; + p = (int *) utarray_next(connections, p)) { + close(*p); + } unlink(socket_pathname); - lookup_successful = 0; + utarray_free(connections); PASS(); } +int logging_test_callback_called = 0; + +void logging_test_callback(redisAsyncContext *ac, void *r, void *privdata) { + logging_test_callback_called = 1; + redisContext *context = redisConnect("127.0.0.1", 6379); + redisReply *reply = redisCommand(context, "KEYS %s", "log:*"); + redisFree(context); + CHECK(reply != NULL); + CHECK(reply->elements > 0); + freeReplyObject(reply); +} + +void logging_read_callback(event_loop *loop, + int fd, + void *context, + int events) { + db_conn *conn = context; + char *cmd = read_string(fd); + redisAsyncCommand(conn->context, logging_test_callback, NULL, cmd, + conn->client_id, 0); + free(cmd); +} + +void logging_accept_callback(event_loop *loop, + int socket_fd, + void *context, + int events) { + int accept_fd = accept_client(socket_fd); + CHECK(accept_fd >= 0); + utarray_push_back(connections, &accept_fd); + event_loop_add_file(loop, accept_fd, EVENT_LOOP_READ, logging_read_callback, + context); +} + TEST logging_test(void) { - int socket_fd, server_fd, client_fd; - event_loop loop; - event_loop_init(&loop); + utarray_new(connections, &ut_int_icd); + event_loop *loop = event_loop_create(); + /* Start IPC channel. */ const char *socket_pathname = "logging-test-socket"; - socket_fd = bind_ipc_sock(socket_pathname); + int socket_fd = bind_ipc_sock(socket_pathname); ASSERT(socket_fd >= 0); - int64_t ipc_index = event_loop_attach(&loop, 1, NULL, socket_fd, POLLIN); + utarray_push_back(connections, &socket_fd); /* Start connection to Redis. */ db_conn conn; db_connect("127.0.0.1", 6379, "", "", 0, &conn); - int64_t db_index = db_attach(&conn, &loop, 0); + db_attach(&conn, loop); /* Send a command to the Redis process. */ - client_fd = connect_ipc_sock(socket_pathname); + int client_fd = connect_ipc_sock(socket_pathname); ASSERT(client_fd >= 0); + utarray_push_back(connections, &client_fd); ray_logger *logger = init_ray_logger("worker", RAY_INFO, 0, &client_fd); ray_log(logger, RAY_INFO, "TEST", "Message"); - while (!lookup_successful) { - int num_ready = event_loop_poll(&loop, -1); - if (num_ready < 0) { - exit(-1); - } - for (int i = 0; i < event_loop_size(&loop); ++i) { - struct pollfd *waiting = event_loop_get(&loop, i); - if (waiting->revents == 0) - continue; - if (i == db_index) { - db_event(&conn); - } else if (i == ipc_index) { - /* For some reason, this check is necessary for Travis - * to pass these tests. */ - ASSERT(waiting->revents & POLLIN); - server_fd = accept_client(socket_fd); - ASSERT(server_fd >= 0); - event_loop_attach(&loop, 1, NULL, server_fd, POLLIN); - } else { - char *cmd = read_string(waiting->fd); - redisAsyncCommand(conn.context, logging_test_callback, NULL, cmd, - conn.client_id, 0); - free(cmd); - } - } - } + event_loop_add_file(loop, socket_fd, EVENT_LOOP_READ, logging_accept_callback, + &conn); + event_loop_add_file(loop, client_fd, EVENT_LOOP_READ, logging_read_callback, + &conn); + event_loop_add_timer(loop, 100, timeout_handler, NULL); + event_loop_run(loop); + + CHECK(logging_test_callback_called); + free_ray_logger(logger); db_disconnect(&conn); - event_loop_free(&loop); - close(server_fd); - close(client_fd); - close(socket_fd); + event_loop_destroy(loop); + for (int *p = (int *) utarray_front(connections); p != NULL; + p = (int *) utarray_next(connections, p)) { + close(*p); + } unlink(socket_pathname); - lookup_successful = 0; + utarray_free(connections); PASS(); } diff --git a/thirdparty/ae/ae.c b/thirdparty/ae/ae.c new file mode 100644 index 00000000..e66808a8 --- /dev/null +++ b/thirdparty/ae/ae.c @@ -0,0 +1,465 @@ +/* A simple event-driven programming library. Originally I wrote this code + * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated + * it in form of a library for easy reuse. + * + * Copyright (c) 2006-2010, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ae.h" +#include "zmalloc.h" +#include "config.h" + +/* Include the best multiplexing layer supported by this system. + * The following should be ordered by performances, descending. */ +#ifdef HAVE_EVPORT +#include "ae_evport.c" +#else + #ifdef HAVE_EPOLL + #include "ae_epoll.c" + #else + #ifdef HAVE_KQUEUE + #include "ae_kqueue.c" + #else + #include "ae_select.c" + #endif + #endif +#endif + +aeEventLoop *aeCreateEventLoop(int setsize) { + aeEventLoop *eventLoop; + int i; + + if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; + eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); + eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); + if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; + eventLoop->setsize = setsize; + eventLoop->lastTime = time(NULL); + eventLoop->timeEventHead = NULL; + eventLoop->timeEventNextId = 0; + eventLoop->stop = 0; + eventLoop->maxfd = -1; + eventLoop->beforesleep = NULL; + if (aeApiCreate(eventLoop) == -1) goto err; + /* Events with mask == AE_NONE are not set. So let's initialize the + * vector with it. */ + for (i = 0; i < setsize; i++) + eventLoop->events[i].mask = AE_NONE; + return eventLoop; + +err: + if (eventLoop) { + zfree(eventLoop->events); + zfree(eventLoop->fired); + zfree(eventLoop); + } + return NULL; +} + +/* Return the current set size. */ +int aeGetSetSize(aeEventLoop *eventLoop) { + return eventLoop->setsize; +} + +/* Resize the maximum set size of the event loop. + * If the requested set size is smaller than the current set size, but + * there is already a file descriptor in use that is >= the requested + * set size minus one, AE_ERR is returned and the operation is not + * performed at all. + * + * Otherwise AE_OK is returned and the operation is successful. */ +int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { + int i; + + if (setsize == eventLoop->setsize) return AE_OK; + if (eventLoop->maxfd >= setsize) return AE_ERR; + if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR; + + eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize); + eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize); + eventLoop->setsize = setsize; + + /* Make sure that if we created new slots, they are initialized with + * an AE_NONE mask. */ + for (i = eventLoop->maxfd+1; i < setsize; i++) + eventLoop->events[i].mask = AE_NONE; + return AE_OK; +} + +void aeDeleteEventLoop(aeEventLoop *eventLoop) { + aeApiFree(eventLoop); + zfree(eventLoop->events); + zfree(eventLoop->fired); + zfree(eventLoop); +} + +void aeStop(aeEventLoop *eventLoop) { + eventLoop->stop = 1; +} + +int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData) +{ + if (fd >= eventLoop->setsize) { + errno = ERANGE; + return AE_ERR; + } + aeFileEvent *fe = &eventLoop->events[fd]; + + if (aeApiAddEvent(eventLoop, fd, mask) == -1) + return AE_ERR; + fe->mask |= mask; + if (mask & AE_READABLE) fe->rfileProc = proc; + if (mask & AE_WRITABLE) fe->wfileProc = proc; + fe->clientData = clientData; + if (fd > eventLoop->maxfd) + eventLoop->maxfd = fd; + return AE_OK; +} + +void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) +{ + if (fd >= eventLoop->setsize) return; + aeFileEvent *fe = &eventLoop->events[fd]; + if (fe->mask == AE_NONE) return; + + aeApiDelEvent(eventLoop, fd, mask); + fe->mask = fe->mask & (~mask); + if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { + /* Update the max fd */ + int j; + + for (j = eventLoop->maxfd-1; j >= 0; j--) + if (eventLoop->events[j].mask != AE_NONE) break; + eventLoop->maxfd = j; + } +} + +int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { + if (fd >= eventLoop->setsize) return 0; + aeFileEvent *fe = &eventLoop->events[fd]; + + return fe->mask; +} + +static void aeGetTime(long *seconds, long *milliseconds) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + *seconds = tv.tv_sec; + *milliseconds = tv.tv_usec/1000; +} + +static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) { + long cur_sec, cur_ms, when_sec, when_ms; + + aeGetTime(&cur_sec, &cur_ms); + when_sec = cur_sec + milliseconds/1000; + when_ms = cur_ms + milliseconds%1000; + if (when_ms >= 1000) { + when_sec ++; + when_ms -= 1000; + } + *sec = when_sec; + *ms = when_ms; +} + +long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, + aeTimeProc *proc, void *clientData, + aeEventFinalizerProc *finalizerProc) +{ + long long id = eventLoop->timeEventNextId++; + aeTimeEvent *te; + + te = zmalloc(sizeof(*te)); + if (te == NULL) return AE_ERR; + te->id = id; + aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); + te->timeProc = proc; + te->finalizerProc = finalizerProc; + te->clientData = clientData; + te->next = eventLoop->timeEventHead; + eventLoop->timeEventHead = te; + return id; +} + +int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) +{ + aeTimeEvent *te = eventLoop->timeEventHead; + while(te) { + if (te->id == id) { + te->id = AE_DELETED_EVENT_ID; + return AE_OK; + } + te = te->next; + } + return AE_ERR; /* NO event with the specified ID found */ +} + +/* Search the first timer to fire. + * This operation is useful to know how many time the select can be + * put in sleep without to delay any event. + * If there are no timers NULL is returned. + * + * Note that's O(N) since time events are unsorted. + * Possible optimizations (not needed by Redis so far, but...): + * 1) Insert the event in order, so that the nearest is just the head. + * Much better but still insertion or deletion of timers is O(N). + * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). + */ +static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) +{ + aeTimeEvent *te = eventLoop->timeEventHead; + aeTimeEvent *nearest = NULL; + + while(te) { + if (!nearest || te->when_sec < nearest->when_sec || + (te->when_sec == nearest->when_sec && + te->when_ms < nearest->when_ms)) + nearest = te; + te = te->next; + } + return nearest; +} + +/* Process time events */ +static int processTimeEvents(aeEventLoop *eventLoop) { + int processed = 0; + aeTimeEvent *te, *prev; + long long maxId; + time_t now = time(NULL); + + /* If the system clock is moved to the future, and then set back to the + * right value, time events may be delayed in a random way. Often this + * means that scheduled operations will not be performed soon enough. + * + * Here we try to detect system clock skews, and force all the time + * events to be processed ASAP when this happens: the idea is that + * processing events earlier is less dangerous than delaying them + * indefinitely, and practice suggests it is. */ + if (now < eventLoop->lastTime) { + te = eventLoop->timeEventHead; + while(te) { + te->when_sec = 0; + te = te->next; + } + } + eventLoop->lastTime = now; + + prev = NULL; + te = eventLoop->timeEventHead; + maxId = eventLoop->timeEventNextId-1; + while(te) { + long now_sec, now_ms; + long long id; + + /* Remove events scheduled for deletion. */ + if (te->id == AE_DELETED_EVENT_ID) { + aeTimeEvent *next = te->next; + if (prev == NULL) + eventLoop->timeEventHead = te->next; + else + prev->next = te->next; + if (te->finalizerProc) + te->finalizerProc(eventLoop, te->clientData); + zfree(te); + te = next; + continue; + } + + /* Make sure we don't process time events created by time events in + * this iteration. Note that this check is currently useless: we always + * add new timers on the head, however if we change the implementation + * detail, this check may be useful again: we keep it here for future + * defense. */ + if (te->id > maxId) { + te = te->next; + continue; + } + aeGetTime(&now_sec, &now_ms); + if (now_sec > te->when_sec || + (now_sec == te->when_sec && now_ms >= te->when_ms)) + { + int retval; + + id = te->id; + retval = te->timeProc(eventLoop, id, te->clientData); + processed++; + if (retval != AE_NOMORE) { + aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); + } else { + te->id = AE_DELETED_EVENT_ID; + } + } + prev = te; + te = te->next; + } + return processed; +} + +/* Process every pending time event, then every pending file event + * (that may be registered by time event callbacks just processed). + * Without special flags the function sleeps until some file event + * fires, or when the next time event occurs (if any). + * + * If flags is 0, the function does nothing and returns. + * if flags has AE_ALL_EVENTS set, all the kind of events are processed. + * if flags has AE_FILE_EVENTS set, file events are processed. + * if flags has AE_TIME_EVENTS set, time events are processed. + * if flags has AE_DONT_WAIT set the function returns ASAP until all + * the events that's possible to process without to wait are processed. + * + * The function returns the number of events processed. */ +int aeProcessEvents(aeEventLoop *eventLoop, int flags) +{ + int processed = 0, numevents; + + /* Nothing to do? return ASAP */ + if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; + + /* Note that we want call select() even if there are no + * file events to process as long as we want to process time + * events, in order to sleep until the next time event is ready + * to fire. */ + if (eventLoop->maxfd != -1 || + ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { + int j; + aeTimeEvent *shortest = NULL; + struct timeval tv, *tvp; + + if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) + shortest = aeSearchNearestTimer(eventLoop); + if (shortest) { + long now_sec, now_ms; + + aeGetTime(&now_sec, &now_ms); + tvp = &tv; + + /* How many milliseconds we need to wait for the next + * time event to fire? */ + long long ms = + (shortest->when_sec - now_sec)*1000 + + shortest->when_ms - now_ms; + + if (ms > 0) { + tvp->tv_sec = ms/1000; + tvp->tv_usec = (ms % 1000)*1000; + } else { + tvp->tv_sec = 0; + tvp->tv_usec = 0; + } + } else { + /* If we have to check for events but need to return + * ASAP because of AE_DONT_WAIT we need to set the timeout + * to zero */ + if (flags & AE_DONT_WAIT) { + tv.tv_sec = tv.tv_usec = 0; + tvp = &tv; + } else { + /* Otherwise we can block */ + tvp = NULL; /* wait forever */ + } + } + + numevents = aeApiPoll(eventLoop, tvp); + for (j = 0; j < numevents; j++) { + aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; + int mask = eventLoop->fired[j].mask; + int fd = eventLoop->fired[j].fd; + int rfired = 0; + + /* note the fe->mask & mask & ... code: maybe an already processed + * event removed an element that fired and we still didn't + * processed, so we check if the event is still valid. */ + if (fe->mask & mask & AE_READABLE) { + rfired = 1; + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + } + if (fe->mask & mask & AE_WRITABLE) { + if (!rfired || fe->wfileProc != fe->rfileProc) + fe->wfileProc(eventLoop,fd,fe->clientData,mask); + } + processed++; + } + } + /* Check time events */ + if (flags & AE_TIME_EVENTS) + processed += processTimeEvents(eventLoop); + + return processed; /* return the number of processed file/time events */ +} + +/* Wait for milliseconds until the given file descriptor becomes + * writable/readable/exception */ +int aeWait(int fd, int mask, long long milliseconds) { + struct pollfd pfd; + int retmask = 0, retval; + + memset(&pfd, 0, sizeof(pfd)); + pfd.fd = fd; + if (mask & AE_READABLE) pfd.events |= POLLIN; + if (mask & AE_WRITABLE) pfd.events |= POLLOUT; + + if ((retval = poll(&pfd, 1, milliseconds))== 1) { + if (pfd.revents & POLLIN) retmask |= AE_READABLE; + if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; + if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; + if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; + return retmask; + } else { + return retval; + } +} + +void aeMain(aeEventLoop *eventLoop) { + eventLoop->stop = 0; + while (!eventLoop->stop) { + if (eventLoop->beforesleep != NULL) + eventLoop->beforesleep(eventLoop); + aeProcessEvents(eventLoop, AE_ALL_EVENTS); + } +} + +char *aeGetApiName(void) { + return aeApiName(); +} + +void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { + eventLoop->beforesleep = beforesleep; +} diff --git a/thirdparty/ae/ae.h b/thirdparty/ae/ae.h new file mode 100644 index 00000000..827c4c9e --- /dev/null +++ b/thirdparty/ae/ae.h @@ -0,0 +1,123 @@ +/* A simple event-driven programming library. Originally I wrote this code + * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated + * it in form of a library for easy reuse. + * + * Copyright (c) 2006-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __AE_H__ +#define __AE_H__ + +#include + +#define AE_OK 0 +#define AE_ERR -1 + +#define AE_NONE 0 +#define AE_READABLE 1 +#define AE_WRITABLE 2 + +#define AE_FILE_EVENTS 1 +#define AE_TIME_EVENTS 2 +#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) +#define AE_DONT_WAIT 4 + +#define AE_NOMORE -1 +#define AE_DELETED_EVENT_ID -1 + +/* Macros */ +#define AE_NOTUSED(V) ((void) V) + +struct aeEventLoop; + +/* Types and data structures */ +typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); +typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); +typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); +typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); + +/* File event structure */ +typedef struct aeFileEvent { + int mask; /* one of AE_(READABLE|WRITABLE) */ + aeFileProc *rfileProc; + aeFileProc *wfileProc; + void *clientData; +} aeFileEvent; + +/* Time event structure */ +typedef struct aeTimeEvent { + long long id; /* time event identifier. */ + long when_sec; /* seconds */ + long when_ms; /* milliseconds */ + aeTimeProc *timeProc; + aeEventFinalizerProc *finalizerProc; + void *clientData; + struct aeTimeEvent *next; +} aeTimeEvent; + +/* A fired event */ +typedef struct aeFiredEvent { + int fd; + int mask; +} aeFiredEvent; + +/* State of an event based program */ +typedef struct aeEventLoop { + int maxfd; /* highest file descriptor currently registered */ + int setsize; /* max number of file descriptors tracked */ + long long timeEventNextId; + time_t lastTime; /* Used to detect system clock skew */ + aeFileEvent *events; /* Registered events */ + aeFiredEvent *fired; /* Fired events */ + aeTimeEvent *timeEventHead; + int stop; + void *apidata; /* This is used for polling API specific data */ + aeBeforeSleepProc *beforesleep; +} aeEventLoop; + +/* Prototypes */ +aeEventLoop *aeCreateEventLoop(int setsize); +void aeDeleteEventLoop(aeEventLoop *eventLoop); +void aeStop(aeEventLoop *eventLoop); +int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData); +void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); +int aeGetFileEvents(aeEventLoop *eventLoop, int fd); +long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, + aeTimeProc *proc, void *clientData, + aeEventFinalizerProc *finalizerProc); +int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); +int aeProcessEvents(aeEventLoop *eventLoop, int flags); +int aeWait(int fd, int mask, long long milliseconds); +void aeMain(aeEventLoop *eventLoop); +char *aeGetApiName(void); +void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); +int aeGetSetSize(aeEventLoop *eventLoop); +int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); + +#endif diff --git a/thirdparty/ae/ae_epoll.c b/thirdparty/ae/ae_epoll.c new file mode 100644 index 00000000..410aac70 --- /dev/null +++ b/thirdparty/ae/ae_epoll.c @@ -0,0 +1,135 @@ +/* Linux epoll(2) based ae.c module + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include + +typedef struct aeApiState { + int epfd; + struct epoll_event *events; +} aeApiState; + +static int aeApiCreate(aeEventLoop *eventLoop) { + aeApiState *state = zmalloc(sizeof(aeApiState)); + + if (!state) return -1; + state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); + if (!state->events) { + zfree(state); + return -1; + } + state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ + if (state->epfd == -1) { + zfree(state->events); + zfree(state); + return -1; + } + eventLoop->apidata = state; + return 0; +} + +static int aeApiResize(aeEventLoop *eventLoop, int setsize) { + aeApiState *state = eventLoop->apidata; + + state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize); + return 0; +} + +static void aeApiFree(aeEventLoop *eventLoop) { + aeApiState *state = eventLoop->apidata; + + close(state->epfd); + zfree(state->events); + zfree(state); +} + +static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { + aeApiState *state = eventLoop->apidata; + struct epoll_event ee = {0}; /* avoid valgrind warning */ + /* If the fd was already monitored for some event, we need a MOD + * operation. Otherwise we need an ADD operation. */ + int op = eventLoop->events[fd].mask == AE_NONE ? + EPOLL_CTL_ADD : EPOLL_CTL_MOD; + + ee.events = 0; + mask |= eventLoop->events[fd].mask; /* Merge old events */ + if (mask & AE_READABLE) ee.events |= EPOLLIN; + if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; + ee.data.fd = fd; + if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; + return 0; +} + +static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { + aeApiState *state = eventLoop->apidata; + struct epoll_event ee = {0}; /* avoid valgrind warning */ + int mask = eventLoop->events[fd].mask & (~delmask); + + ee.events = 0; + if (mask & AE_READABLE) ee.events |= EPOLLIN; + if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; + ee.data.fd = fd; + if (mask != AE_NONE) { + epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee); + } else { + /* Note, Kernel < 2.6.9 requires a non null event pointer even for + * EPOLL_CTL_DEL. */ + epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee); + } +} + +static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { + aeApiState *state = eventLoop->apidata; + int retval, numevents = 0; + + retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, + tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); + if (retval > 0) { + int j; + + numevents = retval; + for (j = 0; j < numevents; j++) { + int mask = 0; + struct epoll_event *e = state->events+j; + + if (e->events & EPOLLIN) mask |= AE_READABLE; + if (e->events & EPOLLOUT) mask |= AE_WRITABLE; + if (e->events & EPOLLERR) mask |= AE_WRITABLE; + if (e->events & EPOLLHUP) mask |= AE_WRITABLE; + eventLoop->fired[j].fd = e->data.fd; + eventLoop->fired[j].mask = mask; + } + } + return numevents; +} + +static char *aeApiName(void) { + return "epoll"; +} diff --git a/thirdparty/ae/ae_evport.c b/thirdparty/ae/ae_evport.c new file mode 100644 index 00000000..5c317bec --- /dev/null +++ b/thirdparty/ae/ae_evport.c @@ -0,0 +1,320 @@ +/* ae.c module for illumos event ports. + * + * Copyright (c) 2012, Joyent, Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include +#include +#include +#include + +#include +#include + +#include + +static int evport_debug = 0; + +/* + * This file implements the ae API using event ports, present on Solaris-based + * systems since Solaris 10. Using the event port interface, we associate file + * descriptors with the port. Each association also includes the set of poll(2) + * events that the consumer is interested in (e.g., POLLIN and POLLOUT). + * + * There's one tricky piece to this implementation: when we return events via + * aeApiPoll, the corresponding file descriptors become dissociated from the + * port. This is necessary because poll events are level-triggered, so if the + * fd didn't become dissociated, it would immediately fire another event since + * the underlying state hasn't changed yet. We must re-associate the file + * descriptor, but only after we know that our caller has actually read from it. + * The ae API does not tell us exactly when that happens, but we do know that + * it must happen by the time aeApiPoll is called again. Our solution is to + * keep track of the last fds returned by aeApiPoll and re-associate them next + * time aeApiPoll is invoked. + * + * To summarize, in this module, each fd association is EITHER (a) represented + * only via the in-kernel association OR (b) represented by pending_fds and + * pending_masks. (b) is only true for the last fds we returned from aeApiPoll, + * and only until we enter aeApiPoll again (at which point we restore the + * in-kernel association). + */ +#define MAX_EVENT_BATCHSZ 512 + +typedef struct aeApiState { + int portfd; /* event port */ + int npending; /* # of pending fds */ + int pending_fds[MAX_EVENT_BATCHSZ]; /* pending fds */ + int pending_masks[MAX_EVENT_BATCHSZ]; /* pending fds' masks */ +} aeApiState; + +static int aeApiCreate(aeEventLoop *eventLoop) { + int i; + aeApiState *state = zmalloc(sizeof(aeApiState)); + if (!state) return -1; + + state->portfd = port_create(); + if (state->portfd == -1) { + zfree(state); + return -1; + } + + state->npending = 0; + + for (i = 0; i < MAX_EVENT_BATCHSZ; i++) { + state->pending_fds[i] = -1; + state->pending_masks[i] = AE_NONE; + } + + eventLoop->apidata = state; + return 0; +} + +static int aeApiResize(aeEventLoop *eventLoop, int setsize) { + /* Nothing to resize here. */ + return 0; +} + +static void aeApiFree(aeEventLoop *eventLoop) { + aeApiState *state = eventLoop->apidata; + + close(state->portfd); + zfree(state); +} + +static int aeApiLookupPending(aeApiState *state, int fd) { + int i; + + for (i = 0; i < state->npending; i++) { + if (state->pending_fds[i] == fd) + return (i); + } + + return (-1); +} + +/* + * Helper function to invoke port_associate for the given fd and mask. + */ +static int aeApiAssociate(const char *where, int portfd, int fd, int mask) { + int events = 0; + int rv, err; + + if (mask & AE_READABLE) + events |= POLLIN; + if (mask & AE_WRITABLE) + events |= POLLOUT; + + if (evport_debug) + fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events); + + rv = port_associate(portfd, PORT_SOURCE_FD, fd, events, + (void *)(uintptr_t)mask); + err = errno; + + if (evport_debug) + fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err)); + + if (rv == -1) { + fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err)); + + if (err == EAGAIN) + fprintf(stderr, "aeApiAssociate: event port limit exceeded."); + } + + return rv; +} + +static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { + aeApiState *state = eventLoop->apidata; + int fullmask, pfd; + + if (evport_debug) + fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask); + + /* + * Since port_associate's "events" argument replaces any existing events, we + * must be sure to include whatever events are already associated when + * we call port_associate() again. + */ + fullmask = mask | eventLoop->events[fd].mask; + pfd = aeApiLookupPending(state, fd); + + if (pfd != -1) { + /* + * This fd was recently returned from aeApiPoll. It should be safe to + * assume that the consumer has processed that poll event, but we play + * it safer by simply updating pending_mask. The fd will be + * re-associated as usual when aeApiPoll is called again. + */ + if (evport_debug) + fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd); + state->pending_masks[pfd] |= fullmask; + return 0; + } + + return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask)); +} + +static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { + aeApiState *state = eventLoop->apidata; + int fullmask, pfd; + + if (evport_debug) + fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask); + + pfd = aeApiLookupPending(state, fd); + + if (pfd != -1) { + if (evport_debug) + fprintf(stderr, "deleting event from pending fd %d\n", fd); + + /* + * This fd was just returned from aeApiPoll, so it's not currently + * associated with the port. All we need to do is update + * pending_mask appropriately. + */ + state->pending_masks[pfd] &= ~mask; + + if (state->pending_masks[pfd] == AE_NONE) + state->pending_fds[pfd] = -1; + + return; + } + + /* + * The fd is currently associated with the port. Like with the add case + * above, we must look at the full mask for the file descriptor before + * updating that association. We don't have a good way of knowing what the + * events are without looking into the eventLoop state directly. We rely on + * the fact that our caller has already updated the mask in the eventLoop. + */ + + fullmask = eventLoop->events[fd].mask; + if (fullmask == AE_NONE) { + /* + * We're removing *all* events, so use port_dissociate to remove the + * association completely. Failure here indicates a bug. + */ + if (evport_debug) + fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd); + + if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) { + perror("aeApiDelEvent: port_dissociate"); + abort(); /* will not return */ + } + } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd, + fullmask) != 0) { + /* + * ENOMEM is a potentially transient condition, but the kernel won't + * generally return it unless things are really bad. EAGAIN indicates + * we've reached an resource limit, for which it doesn't make sense to + * retry (counter-intuitively). All other errors indicate a bug. In any + * of these cases, the best we can do is to abort. + */ + abort(); /* will not return */ + } +} + +static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { + aeApiState *state = eventLoop->apidata; + struct timespec timeout, *tsp; + int mask, i; + uint_t nevents; + port_event_t event[MAX_EVENT_BATCHSZ]; + + /* + * If we've returned fd events before, we must re-associate them with the + * port now, before calling port_get(). See the block comment at the top of + * this file for an explanation of why. + */ + for (i = 0; i < state->npending; i++) { + if (state->pending_fds[i] == -1) + /* This fd has since been deleted. */ + continue; + + if (aeApiAssociate("aeApiPoll", state->portfd, + state->pending_fds[i], state->pending_masks[i]) != 0) { + /* See aeApiDelEvent for why this case is fatal. */ + abort(); + } + + state->pending_masks[i] = AE_NONE; + state->pending_fds[i] = -1; + } + + state->npending = 0; + + if (tvp != NULL) { + timeout.tv_sec = tvp->tv_sec; + timeout.tv_nsec = tvp->tv_usec * 1000; + tsp = &timeout; + } else { + tsp = NULL; + } + + /* + * port_getn can return with errno == ETIME having returned some events (!). + * So if we get ETIME, we check nevents, too. + */ + nevents = 1; + if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents, + tsp) == -1 && (errno != ETIME || nevents == 0)) { + if (errno == ETIME || errno == EINTR) + return 0; + + /* Any other error indicates a bug. */ + perror("aeApiPoll: port_get"); + abort(); + } + + state->npending = nevents; + + for (i = 0; i < nevents; i++) { + mask = 0; + if (event[i].portev_events & POLLIN) + mask |= AE_READABLE; + if (event[i].portev_events & POLLOUT) + mask |= AE_WRITABLE; + + eventLoop->fired[i].fd = event[i].portev_object; + eventLoop->fired[i].mask = mask; + + if (evport_debug) + fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n", + (int)event[i].portev_object, mask); + + state->pending_fds[i] = event[i].portev_object; + state->pending_masks[i] = (uintptr_t)event[i].portev_user; + } + + return nevents; +} + +static char *aeApiName(void) { + return "evport"; +} diff --git a/thirdparty/ae/ae_kqueue.c b/thirdparty/ae/ae_kqueue.c new file mode 100644 index 00000000..6796f4ce --- /dev/null +++ b/thirdparty/ae/ae_kqueue.c @@ -0,0 +1,138 @@ +/* Kqueue(2)-based ae.c module + * + * Copyright (C) 2009 Harish Mallipeddi - harish.mallipeddi@gmail.com + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include +#include +#include + +typedef struct aeApiState { + int kqfd; + struct kevent *events; +} aeApiState; + +static int aeApiCreate(aeEventLoop *eventLoop) { + aeApiState *state = zmalloc(sizeof(aeApiState)); + + if (!state) return -1; + state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize); + if (!state->events) { + zfree(state); + return -1; + } + state->kqfd = kqueue(); + if (state->kqfd == -1) { + zfree(state->events); + zfree(state); + return -1; + } + eventLoop->apidata = state; + return 0; +} + +static int aeApiResize(aeEventLoop *eventLoop, int setsize) { + aeApiState *state = eventLoop->apidata; + + state->events = zrealloc(state->events, sizeof(struct kevent)*setsize); + return 0; +} + +static void aeApiFree(aeEventLoop *eventLoop) { + aeApiState *state = eventLoop->apidata; + + close(state->kqfd); + zfree(state->events); + zfree(state); +} + +static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { + aeApiState *state = eventLoop->apidata; + struct kevent ke; + + if (mask & AE_READABLE) { + EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1; + } + if (mask & AE_WRITABLE) { + EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1; + } + return 0; +} + +static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { + aeApiState *state = eventLoop->apidata; + struct kevent ke; + + if (mask & AE_READABLE) { + EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + kevent(state->kqfd, &ke, 1, NULL, 0, NULL); + } + if (mask & AE_WRITABLE) { + EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + kevent(state->kqfd, &ke, 1, NULL, 0, NULL); + } +} + +static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { + aeApiState *state = eventLoop->apidata; + int retval, numevents = 0; + + if (tvp != NULL) { + struct timespec timeout; + timeout.tv_sec = tvp->tv_sec; + timeout.tv_nsec = tvp->tv_usec * 1000; + retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, + &timeout); + } else { + retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, + NULL); + } + + if (retval > 0) { + int j; + + numevents = retval; + for(j = 0; j < numevents; j++) { + int mask = 0; + struct kevent *e = state->events+j; + + if (e->filter == EVFILT_READ) mask |= AE_READABLE; + if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; + eventLoop->fired[j].fd = e->ident; + eventLoop->fired[j].mask = mask; + } + } + return numevents; +} + +static char *aeApiName(void) { + return "kqueue"; +} diff --git a/thirdparty/ae/ae_select.c b/thirdparty/ae/ae_select.c new file mode 100644 index 00000000..c039a8ea --- /dev/null +++ b/thirdparty/ae/ae_select.c @@ -0,0 +1,106 @@ +/* Select()-based ae.c module. + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include +#include + +typedef struct aeApiState { + fd_set rfds, wfds; + /* We need to have a copy of the fd sets as it's not safe to reuse + * FD sets after select(). */ + fd_set _rfds, _wfds; +} aeApiState; + +static int aeApiCreate(aeEventLoop *eventLoop) { + aeApiState *state = zmalloc(sizeof(aeApiState)); + + if (!state) return -1; + FD_ZERO(&state->rfds); + FD_ZERO(&state->wfds); + eventLoop->apidata = state; + return 0; +} + +static int aeApiResize(aeEventLoop *eventLoop, int setsize) { + /* Just ensure we have enough room in the fd_set type. */ + if (setsize >= FD_SETSIZE) return -1; + return 0; +} + +static void aeApiFree(aeEventLoop *eventLoop) { + zfree(eventLoop->apidata); +} + +static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { + aeApiState *state = eventLoop->apidata; + + if (mask & AE_READABLE) FD_SET(fd,&state->rfds); + if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds); + return 0; +} + +static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { + aeApiState *state = eventLoop->apidata; + + if (mask & AE_READABLE) FD_CLR(fd,&state->rfds); + if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds); +} + +static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { + aeApiState *state = eventLoop->apidata; + int retval, j, numevents = 0; + + memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); + memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); + + retval = select(eventLoop->maxfd+1, + &state->_rfds,&state->_wfds,NULL,tvp); + if (retval > 0) { + for (j = 0; j <= eventLoop->maxfd; j++) { + int mask = 0; + aeFileEvent *fe = &eventLoop->events[j]; + + if (fe->mask == AE_NONE) continue; + if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) + mask |= AE_READABLE; + if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) + mask |= AE_WRITABLE; + eventLoop->fired[numevents].fd = j; + eventLoop->fired[numevents].mask = mask; + numevents++; + } + } + return numevents; +} + +static char *aeApiName(void) { + return "select"; +} diff --git a/thirdparty/ae/config.h b/thirdparty/ae/config.h new file mode 100644 index 00000000..4f8e1ea1 --- /dev/null +++ b/thirdparty/ae/config.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __CONFIG_H +#define __CONFIG_H + +#ifdef __APPLE__ +#include +#endif + +/* Test for polling API */ +#ifdef __linux__ +#define HAVE_EPOLL 1 +#endif + +#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) +#define HAVE_KQUEUE 1 +#endif + +#ifdef __sun +#include +#ifdef _DTRACE_VERSION +#define HAVE_EVPORT 1 +#endif +#endif + + +#endif diff --git a/thirdparty/ae/zmalloc.h b/thirdparty/ae/zmalloc.h new file mode 100644 index 00000000..54c8a69c --- /dev/null +++ b/thirdparty/ae/zmalloc.h @@ -0,0 +1,16 @@ +#ifndef _ZMALLOC_H +#define _ZMALLOC_H + +#ifndef zmalloc +#define zmalloc malloc +#endif + +#ifndef zfree +#define zfree free +#endif + +#ifndef zrealloc +#define zrealloc realloc +#endif + +#endif /* _ZMALLOC_H */