Skip to content

Commit

Permalink
Throw Python exception if plasma store cannot create new object. (ray…
Browse files Browse the repository at this point in the history
…-project#162)

* Propagate error messages through plasma create.

* Use custom exception types instead of exception messages.
  • Loading branch information
robertnishihara authored and stephanie-wang committed Dec 28, 2016
1 parent 10e067e commit baf835e
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 65 deletions.
7 changes: 3 additions & 4 deletions lib/python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,11 @@ def put_object(self, objectid, value):
size = size + 4096 * 4 + 8 # The last 8 bytes are for the metadata offset. This is temporary.
try:
buff = self.plasma_client.create(objectid.id(), size, bytearray(schema))
except RuntimeError as e:
if e.args != ("an object with this ID could not be created",):
raise
except plasma.plasma_object_exists_error as e:
# The object already exists in the object store, so there is no need to
# add it again. TODO(rkn): We need to compare the hashes and make sure
# that the objects are in fact the same.
# that the objects are in fact the same. We also should return an error
# code to the caller instead of printing a message.
print("This object already exists in the object store.")
return
data = np.frombuffer(buff.buffer, dtype="byte")[8:]
Expand Down
14 changes: 10 additions & 4 deletions src/plasma/eviction_policy.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,34 +154,40 @@ void object_created(eviction_state *eviction_state,
add_object_to_lru_cache(eviction_state, obj_id);
}

void require_space(eviction_state *eviction_state,
bool require_space(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
int64_t size,
int64_t *num_objects_to_evict,
object_id **objects_to_evict) {
/* Check if there is enough space to create the object. */
int64_t required_space =
eviction_state->memory_used + size - plasma_store_info->memory_capacity;
int64_t num_bytes_evicted;
if (required_space > 0) {
/* Try to free up at least as much space as we need right now but ideally
* up to 20% of the total capacity. */
int64_t space_to_free = MAX(size, plasma_store_info->memory_capacity / 5);
LOG_DEBUG("not enough space to create this object, so evicting objects");
/* Choose some objects to evict, and update the return pointers. */
int64_t num_bytes_evicted = choose_objects_to_evict(
num_bytes_evicted = choose_objects_to_evict(
eviction_state, plasma_store_info, space_to_free, num_objects_to_evict,
objects_to_evict);
printf("Evicted %" PRId64 " bytes.\n", num_bytes_evicted);
LOG_INFO(
"There is not enough space to create this object, so evicting "
"%" PRId64 " objects to free up %" PRId64 " bytes.\n",
*num_objects_to_evict, num_bytes_evicted);
CHECK(num_bytes_evicted >= required_space);
} else {
num_bytes_evicted = 0;
*num_objects_to_evict = 0;
*objects_to_evict = NULL;
}
eviction_state->memory_used += size;
if (num_bytes_evicted >= required_space) {
/* We only increment the space used if there is enough space to create the
* object. */
eviction_state->memory_used += size;
}
return num_bytes_evicted >= required_space;
}

void begin_object_access(eviction_state *eviction_state,
Expand Down
4 changes: 2 additions & 2 deletions src/plasma/eviction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ void object_created(eviction_state *eviction_state,
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return Void.
* @return True if enough space can be freed and false otherwise.
*/
void require_space(eviction_state *eviction_state,
bool require_space(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
int64_t size,
int64_t *num_objects_to_evict,
Expand Down
4 changes: 3 additions & 1 deletion src/plasma/format/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ enum PlasmaError:int {
// Trying to create an object that already exists.
ObjectExists,
// Trying to access an object that doesn't exist.
ObjectNonexistent
ObjectNonexistent,
// Trying to create an object but there isn't enough space in the store.
OutOfMemory
}

// Plasma store messages
Expand Down
9 changes: 8 additions & 1 deletion src/plasma/plasma/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import time

from . import libplasma
from .libplasma import plasma_object_exists_error
from .libplasma import plasma_out_of_memory_error

PLASMA_ID_SIZE = 20
PLASMA_WAIT_TIMEOUT = 2 ** 30
Expand Down Expand Up @@ -128,7 +130,12 @@ def create(self, object_id, size, metadata=None):
wishes to encode.
Raises:
Exception: An exception is raised if the object could not be created.
plasma_object_exists_error: This exception is raised if the object could
not be created because there already is an object with the same ID in
the plasma store.
plasma_out_of_memory_error: This exception is raised if the object could
not be created because the plasma store is unable to evict enough
objects to create room for it.
"""
# Turn the metadata into the right type.
metadata = bytearray(b"") if metadata is None else metadata
Expand Down
22 changes: 13 additions & 9 deletions src/plasma/plasma_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ void increment_object_count(plasma_connection *conn,
object_entry->count += 1;
}

bool plasma_create(plasma_connection *conn,
object_id obj_id,
int64_t data_size,
uint8_t *metadata,
int64_t metadata_size,
uint8_t **data) {
int plasma_create(plasma_connection *conn,
object_id obj_id,
int64_t data_size,
uint8_t *metadata,
int64_t metadata_size,
uint8_t **data) {
LOG_DEBUG("called plasma_create on conn %d with size %" PRId64
" and metadata size %" PRId64,
conn->store_conn, data_size, metadata_size);
Expand All @@ -234,10 +234,14 @@ bool plasma_create(plasma_connection *conn,
plasma_object object;
plasma_read_CreateReply(reply_data, &id, &object, &error);
free(reply_data);
if (error == PlasmaError_ObjectExists) {
if (error != PlasmaError_OK) {
LOG_DEBUG("returned from plasma_create with error %d", error);
return false;
CHECK(error == PlasmaError_OutOfMemory ||
error == PlasmaError_ObjectExists);
return error;
}
/* If the CreateReply included an error, then the store will not send a file
* descriptor. */
int fd = recv_fd(conn->store_conn);
CHECKM(fd >= 0, "recv not successful");
CHECK(object.data_size == data_size);
Expand All @@ -263,7 +267,7 @@ bool plasma_create(plasma_connection *conn,
* returned by plasma_create goes out of scope, the object does not get
* released before the call to plasma_seal happens. */
increment_object_count(conn, obj_id, &object, false);
return true;
return PlasmaError_OK;
}

/* This method is used to get both the data and the metadata. */
Expand Down
22 changes: 14 additions & 8 deletions src/plasma/plasma_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,21 @@ int plasma_manager_connect(const char *addr, int port);
* @param metadata_size The size in bytes of the metadata. If there is no
metadata, this should be 0.
* @param data The address of the newly created object will be written here.
* @return True, if object was created, false, otherwise (e.g., if object has
* been already created).
* @return One of the following error codes:
* - PlasmaError_OK, if the object was created successfully.
* - PlasmaError_ObjectExists, if an object with this ID is already
* present in the store. In this case, the client should not call
* plasma_release.
* - PlasmaError_OutOfMemory, if the store is out of memory and cannot
* create the object. In this case, the client should not call
* plasma_release.
*/
bool plasma_create(plasma_connection *conn,
object_id object_id,
int64_t size,
uint8_t *metadata,
int64_t metadata_size,
uint8_t **data);
int plasma_create(plasma_connection *conn,
object_id object_id,
int64_t size,
uint8_t *metadata,
int64_t metadata_size,
uint8_t **data);

/**
* Get an object from the Plasma Store. This function will block until the
Expand Down
36 changes: 30 additions & 6 deletions src/plasma/plasma_extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "plasma_client.h"
#include "object_info.h"

PyObject *PlasmaOutOfMemoryError;
PyObject *PlasmaObjectExistsError;

static int PyObjectToPlasmaConnection(PyObject *object,
plasma_connection **conn) {
if (PyCapsule_IsValid(object, "plasma")) {
Expand Down Expand Up @@ -69,14 +72,22 @@ PyObject *PyPlasma_create(PyObject *self, PyObject *args) {
return NULL;
}
uint8_t *data;
bool created = plasma_create(conn, object_id, size,
(uint8_t *) PyByteArray_AsString(metadata),
PyByteArray_Size(metadata), &data);
if (!created) {
PyErr_SetString(PyExc_RuntimeError,
"an object with this ID could not be created");
int error_code = plasma_create(conn, object_id, size,
(uint8_t *) PyByteArray_AsString(metadata),
PyByteArray_Size(metadata), &data);
if (error_code == PlasmaError_ObjectExists) {
PyErr_SetString(PlasmaObjectExistsError,
"An object with this ID already exists in the plasma "
"store.");
return NULL;
}
if (error_code == PlasmaError_OutOfMemory) {
PyErr_SetString(PlasmaOutOfMemoryError,
"The plasma store ran out of memory and could not create "
"this object.");
return NULL;
}
CHECK(error_code == PlasmaError_OK);

#if PY_MAJOR_VERSION >= 3
return PyMemoryView_FromMemory((void *) data, (Py_ssize_t) size, PyBUF_WRITE);
Expand Down Expand Up @@ -422,6 +433,19 @@ MOD_INIT(libplasma) {
"A Python client library for plasma.");
#endif

/* Create a custom exception for when an object ID is reused. */
char plasma_object_exists_error[] = "plasma_object_exists.error";
PlasmaObjectExistsError =
PyErr_NewException(plasma_object_exists_error, NULL, NULL);
Py_INCREF(PlasmaObjectExistsError);
PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError);
/* Create a custom exception for when the plasma store is out of memory. */
char plasma_out_of_memory_error[] = "plasma_out_of_memory.error";
PlasmaOutOfMemoryError =
PyErr_NewException(plasma_out_of_memory_error, NULL, NULL);
Py_INCREF(PlasmaOutOfMemoryError);
PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError);

#if PY_MAJOR_VERSION >= 3
return m;
#endif
Expand Down
9 changes: 4 additions & 5 deletions src/plasma/plasma_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -792,13 +792,12 @@ void process_data_request(event_loop *loop,

/* The corresponding call to plasma_release should happen in
* process_data_chunk. */
bool success_create =
plasma_create(conn->manager_state->plasma_conn, object_id, data_size,
NULL, metadata_size, &(buf->data));
int error_code = plasma_create(conn->manager_state->plasma_conn, object_id,
data_size, NULL, metadata_size, &(buf->data));
/* If success_create == true, a new object has been created.
* If success_create == false the object creation has failed, possibly
* due to an object with the same ID already existing in the Plasma Store. */
if (success_create) {
if (error_code == PlasmaError_OK) {
/* Add buffer where the fetched data is to be stored to
* conn->transfer_queue. */
LL_APPEND(conn->transfer_queue, buf);
Expand All @@ -808,7 +807,7 @@ void process_data_request(event_loop *loop,
/* Switch to reading the data from this socket, instead of listening for
* other requests. */
event_loop_remove_file(loop, client_sock);
if (success_create) {
if (error_code == PlasmaError_OK) {
event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, process_data_chunk,
conn);
} else {
Expand Down
37 changes: 19 additions & 18 deletions src/plasma/plasma_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ void add_client_to_object_clients(object_table_entry *entry,
}

/* Create a new object buffer in the hash table. */
bool create_object(client *client_context,
object_id obj_id,
int64_t data_size,
int64_t metadata_size,
plasma_object *result) {
int create_object(client *client_context,
object_id obj_id,
int64_t data_size,
int64_t metadata_size,
plasma_object *result) {
LOG_DEBUG("creating object"); /* TODO(pcm): add object_id here */
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
Expand All @@ -153,15 +153,20 @@ bool create_object(client *client_context,
if (entry != NULL) {
/* There is already an object with the same ID in the Plasma Store, so
* ignore this requst. */
return false;
return PlasmaError_ObjectExists;
}
/* Tell the eviction policy how much space we need to create this object. */
int64_t num_objects_to_evict;
object_id *objects_to_evict;
require_space(plasma_state->eviction_state, plasma_state->plasma_store_info,
data_size + metadata_size, &num_objects_to_evict,
&objects_to_evict);
bool success = require_space(
plasma_state->eviction_state, plasma_state->plasma_store_info,
data_size + metadata_size, &num_objects_to_evict, &objects_to_evict);
remove_objects(plasma_state, num_objects_to_evict, objects_to_evict);
/* Return an error to the client if not enough space could be freed to create
* the object. */
if (!success) {
return PlasmaError_OutOfMemory;
}
/* Allocate space for the new object */
uint8_t *pointer = dlmalloc(data_size + metadata_size);
int fd;
Expand Down Expand Up @@ -198,7 +203,7 @@ bool create_object(client *client_context,
obj_id);
/* Record that this client is using this object. */
add_client_to_object_clients(entry, client_context);
return true;
return PlasmaError_OK;
}

/* Get an object from the hash table. */
Expand Down Expand Up @@ -506,15 +511,11 @@ void process_message(event_loop *loop,
int64_t metadata_size;
plasma_read_CreateRequest(input, &object_ids[0], &data_size,
&metadata_size);
if (create_object(client_context, object_ids[0], data_size, metadata_size,
&objects[0])) {
error = PlasmaError_OK;
} else {
error = PlasmaError_ObjectExists;
}
int error_code = create_object(client_context, object_ids[0], data_size,
metadata_size, &objects[0]);
CHECK(plasma_send_CreateReply(client_sock, state->builder, object_ids[0],
&objects[0], error) >= 0);
if (error == PlasmaError_OK) {
&objects[0], error_code) >= 0);
if (error_code == PlasmaError_OK) {
CHECK(send_fd(client_sock, objects[0].handle.store_fd) >= 0);
}
} break;
Expand Down
19 changes: 13 additions & 6 deletions src/plasma/plasma_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ typedef struct plasma_store_state plasma_store_state;
* @param object_id Object ID of the object to be created.
* @param data_size Size in bytes of the object to be created.
* @param metadata_size Size in bytes of the object metadata.
* @return False if the object already exists, otherwise true.
* @return One of the following error codes:
* - PlasmaError_OK, if the object was created successfully.
* - PlasmaError_ObjectExists, if an object with this ID is already
* present in the store. In this case, the client should not call
* plasma_release.
* - PlasmaError_OutOfMemory, if the store is out of memory and cannot
* create the object. In this case, the client should not call
* plasma_release.
*/
bool create_object(client *client_context,
object_id object_id,
int64_t data_size,
int64_t metadata_size,
plasma_object *result);
int create_object(client *client_context,
object_id object_id,
int64_t data_size,
int64_t metadata_size,
plasma_object *result);

/**
* Get an object. This method assumes that we currently have or will eventually
Expand Down
Loading

0 comments on commit baf835e

Please sign in to comment.