Skip to content

Commit

Permalink
Plasma subscribe (ray-project#227)
Browse files Browse the repository at this point in the history
* Use object_info as notification, not just the object_id

* Add a regression test for plasma managers connecting to store after some objects have been created

* Send notifications for existing objects to new plasma subscribers

* Continuously try the request to the plasma manager instead of setting a timeout in the test case

* Use ray.services to start Redis in plasma test cases

* fix test case
  • Loading branch information
stephanie-wang authored and pcmoritz committed Jan 26, 2017
1 parent ab8c343 commit a5c8f28
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 50 deletions.
91 changes: 72 additions & 19 deletions python/plasma/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import plasma
from plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object
from ray import services

USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
Expand Down Expand Up @@ -492,19 +493,8 @@ def setUp(self):
store_name1, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
# Start a Redis server.
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../core/src/common/redis_module/libray_redis_module.so")
assert os.path.isfile(redis_path)
assert os.path.isfile(redis_module)
redis_port = 6379
with open(os.devnull, "w") as FNULL:
self.redis_process = subprocess.Popen([redis_path,
"--port", str(redis_port),
"--loadmodule", redis_module],
stdout=FNULL)
time.sleep(0.1)
redis_address = services.start_redis("127.0.0.1")
# Start two PlasmaManagers.
redis_address = "{}:{}".format("127.0.0.1", redis_port)
manager_name1, self.p4, self.port1 = plasma.start_plasma_manager(store_name1, redis_address, use_valgrind=USE_VALGRIND)
manager_name2, self.p5, self.port2 = plasma.start_plasma_manager(store_name2, redis_address, use_valgrind=USE_VALGRIND)
# Connect two PlasmaClients.
Expand Down Expand Up @@ -533,12 +523,11 @@ def tearDown(self):
else:
for process in self.processes_to_kill:
process.kill()
self.redis_process.kill()

# Clean up the Redis server.
services.cleanup()

def test_fetch(self):
if self.redis_process is None:
print("Cannot test fetch without a running redis instance.")
self.assertTrue(False)
for _ in range(10):
# Create an object.
object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000)
Expand Down Expand Up @@ -582,9 +571,6 @@ def test_fetch(self):
memory_buffer=memory_buffer3, metadata=metadata3)

def test_fetch_multiple(self):
if self.redis_process is None:
print("Cannot test fetch without a running redis instance.")
self.assertTrue(False)
for _ in range(20):
# Create two objects and a third fake one that doesn't exist.
object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000)
Expand Down Expand Up @@ -797,6 +783,73 @@ def test_stresstest(self):

print("it took", b, "seconds to put and transfer the objects")

class TestPlasmaManagerRecovery(unittest.TestCase):

def setUp(self):
# Start a Plasma store.
self.store_name, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
# Start a Redis server.
self.redis_address = services.start_redis("127.0.0.1")
# Start a PlasmaManagers.
manager_name, self.p3, self.port1 = plasma.start_plasma_manager(
self.store_name,
self.redis_address,
use_valgrind=USE_VALGRIND)
# Connect a PlasmaClient.
self.client = plasma.PlasmaClient(self.store_name, manager_name)

# Store the processes that will be explicitly killed during tearDown so
# that a test case can remove ones that will be killed during the test.
self.processes_to_kill = [self.p2, self.p3]

def tearDown(self):
# Check that the processes are still alive.
for process in self.processes_to_kill:
self.assertEqual(process.poll(), None)

# Kill the Plasma store and Plasma manager processes.
if USE_VALGRIND:
time.sleep(1) # give processes opportunity to finish work
for process in self.processes_to_kill:
process.send_signal(signal.SIGTERM)
process.wait()
if process.returncode != 0:
print("aborting due to valgrind error")
os._exit(-1)
else:
for process in self.processes_to_kill:
process.kill()

# Clean up the Redis server.
services.cleanup()

def test_delayed_start(self):
num_objects = 10
# Create some objects using one client.
object_ids = [random_object_id() for _ in range(num_objects)]
for i in range(10):
create_object_with_id(self.client, object_ids[i], 2000, 2000)

# Wait until the objects have been sealed in the store.
ready, waiting = self.client.wait(object_ids, num_returns=num_objects)
self.assertEqual(set(ready), set(object_ids))
self.assertEqual(waiting, [])

# Start a second plasma manager attached to the same store.
manager_name, self.p5, self.port2 = plasma.start_plasma_manager(self.store_name, self.redis_address, use_valgrind=USE_VALGRIND)
self.processes_to_kill.append(self.p5)

# Check that the second manager knows about existing objects.
client2 = plasma.PlasmaClient(self.store_name, manager_name)
ready, waiting = [], object_ids
while True:
ready, waiting = client2.wait(object_ids, num_returns=num_objects, timeout=0)
if len(ready) == len(object_ids):
break

self.assertEqual(set(ready), set(object_ids))
self.assertEqual(waiting, [])

if __name__ == "__main__":
if len(sys.argv) > 1:
# pop the argument so we don't mess with unittest's own argument parser
Expand Down
2 changes: 1 addition & 1 deletion python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F
/dev/null.
Returns:
The port used by Redis.
The address used by Redis.
Raises:
Exception: An exception is raised if Redis could not be started.
Expand Down
61 changes: 31 additions & 30 deletions src/plasma/plasma_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ struct client {
* object_table_entry type. */
UT_icd client_icd = {sizeof(client *), NULL, NULL, NULL};

/* This is used to define the queue of object notifications for plasma
* subscribers. */
UT_icd object_info_icd = {sizeof(object_info), NULL, NULL, NULL};

typedef struct {
/** Client file descriptor. This is used as a key for the hash table. */
int subscriber_fd;
/** The object IDs to notify the client about. We notify the client about the
* IDs in the order that the objects were sealed. */
UT_array *object_ids;
/** The object notifications for clients. We notify the client about the
* objects in the order that the objects were sealed or deleted. */
UT_array *object_notifications;
/** Handle for the uthash table. */
UT_hash_handle hh;
} notification_queue;
Expand Down Expand Up @@ -136,7 +140,8 @@ plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) {
return state;
}

void push_notification(plasma_store_state *state, object_id object_id);
void push_notification(plasma_store_state *state,
object_info *object_notification);

/* If this client is not already using the object, add the client to the
* object's list of clients, otherwise do nothing. */
Expand Down Expand Up @@ -562,7 +567,7 @@ void seal_object(client *client_context,
/* Set the object digest. */
memcpy(entry->info.digest, digest, DIGEST_SIZE);
/* Inform all subscribers that a new object has been sealed. */
push_notification(plasma_state, object_id);
push_notification(plasma_state, &entry->info);

/* Update all get requests that involve this object. */
update_object_get_requests(plasma_state, object_id);
Expand All @@ -589,7 +594,8 @@ void delete_object(plasma_store_state *plasma_state, object_id object_id) {
utarray_free(entry->clients);
free(entry);
/* Inform all subscribers that the object has been deleted. */
push_notification(plasma_state, object_id);
object_info notification = {.obj_id = object_id, .is_deletion = true};
push_notification(plasma_state, &notification);
}

void remove_objects(plasma_store_state *plasma_state,
Expand All @@ -605,10 +611,11 @@ void remove_objects(plasma_store_state *plasma_state,
}
}

void push_notification(plasma_store_state *plasma_state, object_id object_id) {
void push_notification(plasma_store_state *plasma_state,
object_info *notification) {
notification_queue *queue, *temp_queue;
HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) {
utarray_push_back(queue->object_ids, &object_id);
utarray_push_back(queue->object_notifications, notification);
send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state,
0);
}
Expand All @@ -627,26 +634,13 @@ void send_notifications(event_loop *loop,
int num_processed = 0;
/* Loop over the array of pending notifications and send as many of them as
* possible. */
for (int i = 0; i < utarray_len(queue->object_ids); ++i) {
object_id *obj_id = (object_id *) utarray_eltptr(queue->object_ids, i);
object_table_entry *entry = NULL;
/* This object should already exist in plasma store state. */
HASH_FIND(handle, plasma_state->plasma_store_info->objects, obj_id,
sizeof(object_id), entry);

object_info object_info;
if (entry == NULL) {
memset(&object_info, 0, sizeof(object_info));
object_info.obj_id = *obj_id;
object_info.is_deletion = true;
} else {
object_info = entry->info;
object_info.is_deletion = false;
}
for (int i = 0; i < utarray_len(queue->object_notifications); ++i) {
object_info *notification =
(object_info *) utarray_eltptr(queue->object_notifications, i);

/* Attempt to send a notification about this object ID. */
int nbytes =
send(client_sock, (char const *) &object_info, sizeof(object_info), 0);
send(client_sock, (char const *) notification, sizeof(object_info), 0);
if (nbytes >= 0) {
CHECK(nbytes == sizeof(object_info));
} else if (nbytes == -1 &&
Expand All @@ -667,9 +661,9 @@ void send_notifications(event_loop *loop,
num_processed += 1;
}
/* Remove the sent notifications from the array. */
utarray_erase(queue->object_ids, 0, num_processed);
utarray_erase(queue->object_notifications, 0, num_processed);
/* If we have sent all notifications, remove the fd from the event loop. */
if (utarray_len(queue->object_ids) == 0) {
if (utarray_len(queue->object_notifications) == 0) {
event_loop_remove_file(loop, client_sock);
}
}
Expand All @@ -686,16 +680,23 @@ void subscribe_to_updates(client *client_context, int conn) {
LOG_WARN("Failed to receive file descriptor from client on fd %d.", conn);
return;
}
CHECKM(HASH_CNT(handle, plasma_state->plasma_store_info->objects) == 0,
"plasma_subscribe should be called before any objects are created.");

/* Create a new array to buffer notifications that can't be sent to the
* subscriber yet because the socket send buffer is full. TODO(rkn): the queue
* never gets freed. */
notification_queue *queue =
(notification_queue *) malloc(sizeof(notification_queue));
queue->subscriber_fd = fd;
utarray_new(queue->object_ids, &object_id_icd);
utarray_new(queue->object_notifications, &object_info_icd);
HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue);

/* Push notifications to the new subscriber about existing objects. */
object_table_entry *entry, *temp_entry;
HASH_ITER(handle, plasma_state->plasma_store_info->objects, entry,
temp_entry) {
utarray_push_back(queue->object_notifications, &entry->info);
}
send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, 0);
}

void process_message(event_loop *loop,
Expand Down

0 comments on commit a5c8f28

Please sign in to comment.