From 9b3ce3e64bfaa6410aae2f1126eb163a30a2ab36 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 22 Feb 2019 18:21:01 -0800 Subject: [PATCH] Revert inline objects PR (#4125) * Revert "Inline objects (#3756)" This reverts commit f9875727954effb90d6cf74158559d68bc58c5c0. * fix rebase problems * more rebase fixes * add back debug statement --- .../src/main/resources/ray.default.conf | 2 - python/ray/tests/test_basic.py | 58 +-------- python/ray/tests/test_object_manager.py | 7 +- src/ray/gcs/format/gcs.fbs | 8 -- src/ray/object_manager/object_directory.cc | 112 ++++-------------- src/ray/object_manager/object_directory.h | 29 ++--- src/ray/object_manager/object_manager.cc | 89 ++------------ src/ray/object_manager/object_manager.h | 21 +--- .../test/object_manager_stress_test.cc | 12 +- .../test/object_manager_test.cc | 32 ++--- src/ray/ray_config_def.h | 5 - src/ray/raylet/node_manager.cc | 16 +-- src/ray/raylet/node_manager.h | 6 +- src/ray/raylet/raylet.cc | 7 +- src/ray/raylet/raylet.h | 4 - src/ray/raylet/reconstruction_policy.cc | 6 +- src/ray/raylet/reconstruction_policy_test.cc | 10 +- src/ray/test/run_object_manager_valgrind.sh | 3 - 18 files changed, 75 insertions(+), 352 deletions(-) diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index 6a3f95a0149e..f5a23c73d8c8 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -92,8 +92,6 @@ ray { // See src/ray/ray_config_def.h for options. config { - // Maximum size of an inline object (bytes). - inline_object_max_size_bytes: 0 } } diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 370dfc018809..b1f88dd549be 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1151,14 +1151,8 @@ def test_object_transfer_dump(ray_start_cluster): cluster = ray_start_cluster num_nodes = 3 - # Set the inline object size to 0 to force all objects to be written to - # plasma. - config = json.dumps({"inline_object_max_size_bytes": 0}) for i in range(num_nodes): - cluster.add_node( - resources={str(i): 1}, - object_store_memory=10**9, - _internal_config=config) + cluster.add_node(resources={str(i): 1}, object_store_memory=10**9) ray.init(redis_address=cluster.redis_address) @ray.remote @@ -2659,56 +2653,6 @@ def f(): assert len(ready_ids) == 1 -def test_inline_objects(shutdown_only): - config = json.dumps({"initial_reconstruction_timeout_milliseconds": 200}) - ray.init(num_cpus=1, object_store_memory=10**7, _internal_config=config) - - @ray.remote - class Actor(object): - def create_inline_object(self): - return "inline" - - def create_non_inline_object(self): - return 10000 * [1] - - def get(self): - return - - a = Actor.remote() - # Count the number of objects that were successfully inlined. - inlined = 0 - for _ in range(100): - inline_object = a.create_inline_object.remote() - ray.get(inline_object) - plasma_id = ray.pyarrow.plasma.ObjectID(inline_object.binary()) - ray.worker.global_worker.plasma_client.delete([plasma_id]) - # Make sure we can still get an inlined object created by an actor even - # after it has been evicted. - try: - value = ray.get(inline_object) - assert value == "inline" - inlined += 1 - except ray.exceptions.UnreconstructableError: - pass - # Make sure some objects were inlined. Some of them may not get inlined - # because we evict the object soon after creating it. - assert inlined > 0 - - # Non-inlined objects are not able to be recreated after eviction. - for _ in range(10): - non_inline_object = a.create_non_inline_object.remote() - ray.get(non_inline_object) - plasma_id = ray.pyarrow.plasma.ObjectID(non_inline_object.binary()) - # This while loop is necessary because sometimes the object is still - # there immediately after plasma_client.delete. - while ray.worker.global_worker.plasma_client.contains(plasma_id): - ray.worker.global_worker.plasma_client.delete([plasma_id]) - # Objects created by an actor that were evicted and larger than the - # maximum inline object size cannot be retrieved or reconstructed. - with pytest.raises(ray.exceptions.UnreconstructableError): - ray.get(non_inline_object) == 10000 * [1] - - def test_ray_setproctitle(shutdown_only): ray.init(num_cpus=2) diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 757be18979a3..7c4ffa92f9de 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -210,7 +210,7 @@ def set_weights(self, x): def test_object_transfer_retry(ray_start_empty_cluster): cluster = ray_start_empty_cluster - repeated_push_delay = 10 + repeated_push_delay = 4 # Force the sending object manager to allow duplicate pushes again sooner. # Also, force the receiving object manager to retry the Pull sooner. We @@ -262,6 +262,11 @@ def f(size): ray.worker.global_worker.plasma_client.contains( ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids) + end_time = time.time() + # Make sure that the first time the objects get transferred, it happens + # quickly. + assert end_time - start_time < repeated_push_delay + # Get the objects again and make sure they get transferred. xs = ray.get(x_ids) end_transfer_time = time.time() diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index fcb70ab37e74..c826d97a66de 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -122,18 +122,10 @@ table FunctionTableData { table ObjectTableData { // The size of the object. object_size: long; - // Is object in-lined? Inline objects are objects whose data and metadata are - // inlined in the GCS object table entry, which normally only specifies - // the object location. - inline_object_flag: bool; // The node manager ID that this object appeared on or was evicted by. manager: string; // Whether this entry is an addition or a deletion. is_eviction: bool; - // In-line object data. - inline_object_data: [ubyte]; - // In-line object metadata. - inline_object_metadata: [ubyte]; } table TaskReconstructionData { diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 6a2535025eba..7692ce505163 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -8,21 +8,15 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service, namespace { -/// Process a suffix of the object table log. -/// If object is inlined (inline_object_flag = TRUE), its data and metadata are -/// stored with the object's entry so we read them into inline_object_data, and -/// inline_object_metadata, respectively. -/// If object is not inlined, store the result in client_ids. -/// This assumes that client_ids already contains the result of the -/// object table log up to but not including this suffix. -/// This function also stores a bool in has_been_created indicating whether the -/// object has ever been created before. +/// Process a suffix of the object table log and store the result in +/// client_ids. This assumes that client_ids already contains the result of the +/// object table log up to but not including this suffix. This also stores a +/// bool in has_been_created indicating whether the object has ever been +/// created before. void UpdateObjectLocations(const std::vector &location_history, const ray::gcs::ClientTable &client_table, std::unordered_set *client_ids, - bool *inline_object_flag, - std::vector *inline_object_data, - std::string *inline_object_metadata, bool *has_been_created) { + bool *has_been_created) { // location_history contains the history of locations of the object (it is a log), // which might look like the following: // client1.is_eviction = false @@ -30,9 +24,6 @@ void UpdateObjectLocations(const std::vector &location_history // client2.is_eviction = false // In such a scenario, we want to indicate client2 is the only client that contains // the object, which the following code achieves. - // - // If object is inlined each entry contains both the object's data and metadata, - // so we don't care about its location. if (!location_history.empty()) { // If there are entries, then the object has been created. Once this flag // is set to true, it should never go back to false. @@ -40,35 +31,18 @@ void UpdateObjectLocations(const std::vector &location_history } for (const auto &object_table_data : location_history) { ClientID client_id = ClientID::from_binary(object_table_data.manager); - if (object_table_data.inline_object_flag) { - if (!*inline_object_flag) { - // This is the first time we're receiving the inline object data. Read - // object's data from the GCS entry. - *inline_object_flag = object_table_data.inline_object_flag; - inline_object_data->assign(object_table_data.inline_object_data.begin(), - object_table_data.inline_object_data.end()); - inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(), - object_table_data.inline_object_metadata.end()); - } - // We got the data and metadata of the object so exit the loop. - break; - } - if (!object_table_data.is_eviction) { client_ids->insert(client_id); } else { client_ids->erase(client_id); } } - - if (!*inline_object_flag) { - // Filter out the removed clients from the object locations. - for (auto it = client_ids->begin(); it != client_ids->end();) { - if (client_table.IsRemoved(*it)) { - it = client_ids->erase(it); - } else { - it++; - } + // Filter out the removed clients from the object locations. + for (auto it = client_ids->begin(); it != client_ids->end();) { + if (client_table.IsRemoved(*it)) { + it = client_ids->erase(it); + } else { + it++; } } } @@ -88,8 +62,6 @@ void ObjectDirectory::RegisterBackend() { // Update entries for this object. UpdateObjectLocations(location_history, gcs_client_->client_table(), &it->second.current_object_locations, - &it->second.inline_object_flag, &it->second.inline_object_data, - &it->second.inline_object_metadata, &it->second.has_been_created); // Copy the callbacks so that the callbacks can unsubscribe without interrupting // looping over the callbacks. @@ -102,8 +74,6 @@ void ObjectDirectory::RegisterBackend() { // It is safe to call the callback directly since this is already running // in the subscription callback stack. callback_pair.second(object_id, it->second.current_object_locations, - it->second.inline_object_flag, it->second.inline_object_data, - it->second.inline_object_metadata, it->second.has_been_created); } }; @@ -114,25 +84,13 @@ void ObjectDirectory::RegisterBackend() { ray::Status ObjectDirectory::ReportObjectAdded( const ObjectID &object_id, const ClientID &client_id, - const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag, - const plasma::ObjectBuffer &plasma_buffer) { - RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? " - << inline_object_flag; + const object_manager::protocol::ObjectInfoT &object_info) { + RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id; // Append the addition entry to the object table. auto data = std::make_shared(); data->manager = client_id.binary(); data->is_eviction = false; data->object_size = object_info.data_size; - data->inline_object_flag = inline_object_flag; - if (inline_object_flag) { - // Add object's data to its GCS entry. - data->inline_object_data.assign( - plasma_buffer.data->data(), - plasma_buffer.data->data() + plasma_buffer.data->size()); - data->inline_object_metadata.assign( - plasma_buffer.metadata->data(), - plasma_buffer.metadata->data() + plasma_buffer.metadata->size()); - } ray::Status status = gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr); return status; @@ -184,19 +142,16 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) { if (listener.second.current_object_locations.count(client_id) > 0) { // If the subscribed object has the removed client as a location, update // its locations with an empty log so that the location will be removed. - UpdateObjectLocations( - {}, gcs_client_->client_table(), &listener.second.current_object_locations, - &listener.second.inline_object_flag, &listener.second.inline_object_data, - &listener.second.inline_object_metadata, &listener.second.has_been_created); + UpdateObjectLocations({}, gcs_client_->client_table(), + &listener.second.current_object_locations, + &listener.second.has_been_created); // Re-call all the subscribed callbacks for the object, since its // locations have changed. for (const auto &callback_pair : listener.second.callbacks) { // It is safe to call the callback directly since this is already running // in the subscription callback stack. - callback_pair.second( - object_id, listener.second.current_object_locations, - listener.second.inline_object_flag, listener.second.inline_object_data, - listener.second.inline_object_metadata, listener.second.has_been_created); + callback_pair.second(object_id, listener.second.current_object_locations, + listener.second.has_been_created); } } } @@ -222,14 +177,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // immediately notify the caller of the current known locations. if (listener_state.has_been_created) { auto &locations = listener_state.current_object_locations; - auto inline_object_flag = listener_state.inline_object_flag; - const auto &inline_object_data = listener_state.inline_object_data; - const auto &inline_object_metadata = listener_state.inline_object_metadata; - io_service_.post([callback, locations, inline_object_flag, inline_object_data, - inline_object_metadata, object_id]() { - callback(object_id, locations, inline_object_flag, inline_object_data, - inline_object_metadata, - /*has_been_created=*/true); + io_service_.post([callback, locations, object_id]() { + callback(object_id, locations, /*has_been_created=*/true); }); } return status; @@ -262,31 +211,20 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, const std::vector &location_history) { // Build the set of current locations based on the entries in the log. std::unordered_set client_ids; - bool inline_object_flag = false; - std::vector inline_object_data; - std::string inline_object_metadata; bool has_been_created = false; UpdateObjectLocations(location_history, gcs_client_->client_table(), - &client_ids, &inline_object_flag, &inline_object_data, - &inline_object_metadata, &has_been_created); + &client_ids, &has_been_created); // It is safe to call the callback directly since this is already running // in the GCS client's lookup callback stack. - callback(object_id, client_ids, inline_object_flag, inline_object_data, - inline_object_metadata, has_been_created); + callback(object_id, client_ids, has_been_created); }); } else { // If we have locations cached due to a concurrent SubscribeObjectLocations // call, call the callback immediately with the cached locations. - // If object inlined, we already have the object's data. auto &locations = it->second.current_object_locations; bool has_been_created = it->second.has_been_created; - bool inline_object_flag = it->second.inline_object_flag; - const auto &inline_object_data = it->second.inline_object_data; - const auto &inline_object_metadata = it->second.inline_object_metadata; - io_service_.post([callback, object_id, locations, inline_object_flag, - inline_object_data, inline_object_metadata, has_been_created]() { - callback(object_id, locations, inline_object_flag, inline_object_data, - inline_object_metadata, has_been_created); + io_service_.post([callback, object_id, locations, has_been_created]() { + callback(object_id, locations, has_been_created); }); } return status; diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index f1c04de0daf5..0559ad534514 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -50,9 +50,9 @@ class ObjectDirectoryInterface { virtual std::vector LookupAllRemoteConnections() const = 0; /// Callback for object location notifications. - using OnLocationsFound = std::function &, bool, - const std::vector &, const std::string &, bool has_been_created)>; + using OnLocationsFound = std::function &, + bool has_been_created)>; /// Lookup object locations. Callback may be invoked with empty list of client ids. /// @@ -101,14 +101,10 @@ class ObjectDirectoryInterface { /// \param object_id The object id that was put into the store. /// \param client_id The client id corresponding to this node. /// \param object_info Additional information about the object. - /// \param inline_object_flag Flag specifying whether object is inlined. - /// \param plasma_buffer Object data and metadata from plasma. This data is - /// only valid for inlined objects (i.e., when inline_object_flag=true). /// \return Status of whether this method succeeded. virtual ray::Status ReportObjectAdded( const ObjectID &object_id, const ClientID &client_id, - const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag, - const plasma::ObjectBuffer &plasma_buffer) = 0; + const object_manager::protocol::ObjectInfoT &object_info) = 0; /// Report objects removed from this client's store to the object directory. /// @@ -160,11 +156,9 @@ class ObjectDirectory : public ObjectDirectoryInterface { ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id) override; - ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id, - const object_manager::protocol::ObjectInfoT &object_info, - bool inline_object_flag, - const plasma::ObjectBuffer &plasma_buffer) override; - + ray::Status ReportObjectAdded( + const ObjectID &object_id, const ClientID &client_id, + const object_manager::protocol::ObjectInfoT &object_info) override; ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) override; @@ -182,15 +176,6 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::unordered_map callbacks; /// The current set of known locations of this object. std::unordered_set current_object_locations; - /// Specify whether the object is inlined. The data and the metadata of - /// an inlined object are stored in the object's GCS entry. In this flag - /// (i.e., the object is inlined) the content of current_object_locations - /// can be ignored. - bool inline_object_flag; - /// Inlined object data, if inline_object_flag == true. - std::vector inline_object_data; - /// Inlined object metadata, if inline_object_flag == true. - std::string inline_object_metadata; /// This flag will get set to true if the object has ever been created. It /// should never go back to false once set to true. If this is true, and /// the current_object_locations is empty, then this means that the object diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 3f82f3baffe5..b045f985def5 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -10,15 +10,13 @@ namespace ray { ObjectManager::ObjectManager(asio::io_service &main_service, const ObjectManagerConfig &config, - std::shared_ptr object_directory, - plasma::PlasmaClient &store_client) + std::shared_ptr object_directory) : config_(config), object_directory_(std::move(object_directory)), store_notification_(main_service, config_.store_socket_name), buffer_pool_(config_.store_socket_name, config_.object_chunk_size), send_work_(send_service_), receive_work_(receive_service_), - store_client_(store_client), connection_pool_(), gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) { RAY_CHECK(config_.max_sends > 0); @@ -69,30 +67,9 @@ void ObjectManager::HandleObjectAdded( RAY_LOG(DEBUG) << "Object added " << object_id; RAY_CHECK(local_objects_.count(object_id) == 0); local_objects_[object_id].object_info = object_info; - // If this object was created from inlined data, this means it is already in GCS, - // so no need to write it again. - if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) { - std::vector inline_object_data; - std::string inline_object_metadata; - bool inline_object_flag = false; - plasma::ObjectBuffer object_buffer; - if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) { - // Inline object. Try to get the data from the object store. - plasma::ObjectID plasma_id = object_id.to_plasma_id(); - RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer)); - if (object_buffer.data != nullptr) { - // The object exists. Set inline_object_flag so that the object data - // will be stored in the GCS entry. - inline_object_flag = true; - // Mark this object as inlined, so that if this object is later - // evicted, we do not report it to the GCS. - local_inlined_objects_.insert(object_id); - } - } + ray::Status status = + object_directory_->ReportObjectAdded(object_id, client_id_, object_info); - RAY_CHECK_OK(object_directory_->ReportObjectAdded(object_id, client_id_, object_info, - inline_object_flag, object_buffer)); - } // Handle the unfulfilled_push_requests_ which contains the push request that is not // completed due to unsatisfied local objects. auto iter = unfulfilled_push_requests_.find(object_id); @@ -114,16 +91,10 @@ void ObjectManager::HandleObjectAdded( } void ObjectManager::NotifyDirectoryObjectDeleted(const ObjectID &object_id) { - RAY_LOG(DEBUG) << "Object removed " << object_id; auto it = local_objects_.find(object_id); RAY_CHECK(it != local_objects_.end()); - if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) { - // Inline object data can be retrieved by any node by contacting the GCS, - // so only report that the object was evicted if it wasn't inlined. - RAY_CHECK_OK(object_directory_->ReportObjectRemoved(object_id, client_id_)); - } local_objects_.erase(it); - local_inlined_objects_.erase(object_id); + ray::Status status = object_directory_->ReportObjectRemoved(object_id, client_id_); } ray::Status ObjectManager::SubscribeObjAdded( @@ -138,26 +109,6 @@ ray::Status ObjectManager::SubscribeObjDeleted( return ray::Status::OK(); } -void ObjectManager::PutInlineObject(const ObjectID &object_id, - const std::vector &inline_object_data, - const std::string &inline_object_metadata) { - if (local_objects_.find(object_id) == local_objects_.end()) { - // Inline object is not in the local object store. Create it from - // inline_object_data, and inline_object_metadata, respectively. - // - // Since this function is called on notification or when reading the - // object's entry from GCS, we know this object's entry is already in GCS. - // Remember this by adding the object to local_inlined_objects_. This way - // we avoid writing another copy of this object to GCS in HandleObjectAdded(). - local_inlined_objects_.insert(object_id); - auto status = store_client_.CreateAndSeal( - object_id.to_plasma_id(), - std::string(inline_object_data.begin(), inline_object_data.end()), - inline_object_metadata); - RAY_CHECK(status.IsPlasmaObjectExists() || status.ok()) << status.message(); - } -} - ray::Status ObjectManager::Pull(const ObjectID &object_id) { RAY_LOG(DEBUG) << "Pull on " << client_id_ << " of object " << object_id; // Check if object is already local. @@ -177,13 +128,7 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) { return object_directory_->SubscribeObjectLocations( object_directory_pull_callback_id_, object_id, [this](const ObjectID &object_id, const std::unordered_set &client_ids, - bool inline_object_flag, const std::vector &inline_object_data, - const std::string &inline_object_metadata, bool created) { - if (inline_object_flag) { - // This is an inlined object. Store it in the Plasma store and return. - PutInlineObject(object_id, inline_object_data, inline_object_metadata); - return; - } + bool created) { // Exit if the Pull request has already been fulfilled or canceled. auto it = pull_requests_.find(object_id); if (it == pull_requests_.end()) { @@ -635,19 +580,11 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) { RAY_RETURN_NOT_OK(object_directory_->LookupLocations( object_id, [this, wait_id](const ObjectID &lookup_object_id, - const std::unordered_set &client_ids, - bool inline_object_flag, - const std::vector &inline_object_data, - const std::string &inline_object_metadata, bool created) { + const std::unordered_set &client_ids, bool created) { auto &wait_state = active_wait_requests_.find(wait_id)->second; - if (!client_ids.empty() || inline_object_flag) { + if (!client_ids.empty()) { wait_state.remaining.erase(lookup_object_id); wait_state.found.insert(lookup_object_id); - if (inline_object_flag) { - // This is an inlined object. Store it in the Plasma store and return. - PutInlineObject(lookup_object_id, inline_object_data, - inline_object_metadata); - } } RAY_LOG(DEBUG) << "Wait request " << wait_id << ": " << client_ids.size() << " locations found for object " << lookup_object_id; @@ -681,11 +618,8 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { RAY_CHECK_OK(object_directory_->SubscribeObjectLocations( wait_id, object_id, [this, wait_id](const ObjectID &subscribe_object_id, - const std::unordered_set &client_ids, - bool inline_object_flag, - const std::vector &inline_object_data, - const std::string &inline_object_metadata, bool created) { - if (!client_ids.empty() || inline_object_flag) { + const std::unordered_set &client_ids, bool created) { + if (!client_ids.empty()) { RAY_LOG(DEBUG) << "Wait request " << wait_id << ": subscription notification received for object " << subscribe_object_id; @@ -697,11 +631,6 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { // notification. return; } - if (inline_object_flag) { - // This is an inlined object. Store it in the Plasma store. - PutInlineObject(subscribe_object_id, inline_object_data, - inline_object_metadata); - } auto &wait_state = object_id_wait_state->second; wait_state.remaining.erase(subscribe_object_id); wait_state.found.insert(subscribe_object_id); diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index b170e072781f..29c75c57a773 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -76,12 +76,9 @@ class ObjectManager : public ObjectManagerInterface { /// \param main_service The main asio io_service. /// \param config ObjectManager configuration. /// \param object_directory An object implementing the object directory interface. - /// \param store_client Reference to Plasma store. This is used to get and put - /// inlined objects in the local object store. explicit ObjectManager(boost::asio::io_service &main_service, const ObjectManagerConfig &config, - std::shared_ptr object_directory, - plasma::PlasmaClient &store_client); + std::shared_ptr object_directory); ~ObjectManager(); @@ -354,12 +351,6 @@ class ObjectManager : public ObjectManagerInterface { /// Handle Push task timeout. void HandlePushTaskTimeout(const ObjectID &object_id, const ClientID &client_id); - /// Add inline object to object store. Called when reading the object entry - /// from GCS or upon receiving a notification about an inline object. - void PutInlineObject(const ObjectID &object_id, - const std::vector &inline_object_data, - const std::string &inline_object_metadata); - ClientID client_id_; const ObjectManagerConfig config_; std::shared_ptr object_directory_; @@ -389,10 +380,6 @@ class ObjectManager : public ObjectManagerInterface { /// all incoming object transfers. std::vector receive_threads_; - /// Reference to Plasma Store. This is used to get and put inlined objects in - /// the local object store. - plasma::PlasmaClient &store_client_; - /// Connection pool for reusing outgoing connections to remote object managers. ConnectionPool connection_pool_; @@ -400,12 +387,6 @@ class ObjectManager : public ObjectManagerInterface { /// including when the object was last pushed to other object managers. std::unordered_map local_objects_; - /// Set of objects created from inlined data whose locations and/or evictions - /// should not be reported to the GCS. This includes objects that were - /// created from data retrieved from the GCS, since a GCS entry with the - /// inlined data already exists. - std::unordered_set local_inlined_objects_; - /// This is used as the callback identifier in Pull for /// SubscribeObjectLocations. We only need one identifier because we never need to /// subscribe multiple times to the same object during Pull. diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index e7092955cc1f..91b0ffc3d576 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -30,16 +30,13 @@ class MockServer { public: MockServer(boost::asio::io_service &main_service, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client, - const std::string &store_name) + std::shared_ptr gcs_client) : object_manager_acceptor_( main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), object_manager_socket_(main_service), gcs_client_(gcs_client), object_manager_(main_service, object_manager_config, - std::make_shared(main_service, gcs_client_), - store_client_) { - RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str())); + std::make_shared(main_service, gcs_client_)) { RAY_CHECK_OK(RegisterGcs(main_service)); // Start listening for clients. DoAcceptObjectManager(); @@ -91,7 +88,6 @@ class MockServer { boost::asio::ip::tcp::acceptor object_manager_acceptor_; boost::asio::ip::tcp::socket object_manager_socket_; std::shared_ptr gcs_client_; - plasma::PlasmaClient store_client_; ObjectManager object_manager_; }; @@ -146,7 +142,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_1.max_receives = max_receives_a; om_config_1.object_chunk_size = object_chunk_size; om_config_1.push_timeout_ms = push_timeout_ms; - server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1)); + server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); // start second server gcs_client_2 = std::shared_ptr( @@ -158,7 +154,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_2.max_receives = max_receives_b; om_config_2.object_chunk_size = object_chunk_size; om_config_2.push_timeout_ms = push_timeout_ms; - server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2)); + server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. RAY_ARROW_CHECK_OK(client1.Connect(store_id_1)); diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 3da9c6408cf7..699d119e41b3 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -10,7 +10,6 @@ namespace { std::string store_executable; int64_t wait_timeout_ms; -bool test_inline_objects = false; } namespace ray { @@ -25,16 +24,13 @@ class MockServer { public: MockServer(boost::asio::io_service &main_service, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client, - const std::string &store_name) + std::shared_ptr gcs_client) : object_manager_acceptor_( main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), object_manager_socket_(main_service), gcs_client_(gcs_client), object_manager_(main_service, object_manager_config, - std::make_shared(main_service, gcs_client_), - store_client_) { - RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str())); + std::make_shared(main_service, gcs_client_)) { RAY_CHECK_OK(RegisterGcs(main_service)); // Start listening for clients. DoAcceptObjectManager(); @@ -86,7 +82,6 @@ class MockServer { boost::asio::ip::tcp::acceptor object_manager_acceptor_; boost::asio::ip::tcp::socket object_manager_socket_; std::shared_ptr gcs_client_; - plasma::PlasmaClient store_client_; ObjectManager object_manager_; }; @@ -135,7 +130,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_1.max_receives = max_receives; om_config_1.object_chunk_size = object_chunk_size; om_config_1.push_timeout_ms = push_timeout_ms; - server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1)); + server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); // start second server gcs_client_2 = std::shared_ptr( @@ -147,7 +142,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_2.max_receives = max_receives; om_config_2.object_chunk_size = object_chunk_size; om_config_2.push_timeout_ms = push_timeout_ms; - server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2)); + server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. RAY_ARROW_CHECK_OK(client1.Connect(store_id_1)); @@ -299,10 +294,8 @@ class TestObjectManager : public TestObjectManagerBase { sub_id, object_1, [this, sub_id, object_1, object_2]( const ray::ObjectID &object_id, - const std::unordered_set &clients, bool inline_object_flag, - const std::vector inline_object_data, - const std::string inline_object_metadata, bool created) { - if (!clients.empty() || inline_object_flag) { + const std::unordered_set &clients, bool created) { + if (!clients.empty()) { TestWaitWhileSubscribed(sub_id, object_1, object_2); } })); @@ -341,14 +334,7 @@ class TestObjectManager : public TestObjectManagerBase { } void NextWaitTest() { - int data_size; - // Set the data size under or over the inline objects limit depending on - // the test configuration. - if (test_inline_objects) { - data_size = RayConfig::instance().inline_object_max_size_bytes() / 2; - } else { - data_size = RayConfig::instance().inline_object_max_size_bytes() * 2; - } + int data_size = 600; current_wait_test += 1; switch (current_wait_test) { case 0: { @@ -499,9 +485,5 @@ int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); store_executable = std::string(argv[1]); wait_timeout_ms = std::stoi(std::string(argv[2])); - // If a third argument is provided, then test with inline objects. - if (argc > 3) { - test_inline_objects = true; - } return RUN_ALL_TESTS(); } diff --git a/src/ray/ray_config_def.h b/src/ray/ray_config_def.h index f995a1d68abd..a2fee310ac8a 100644 --- a/src/ray/ray_config_def.h +++ b/src/ray/ray_config_def.h @@ -134,11 +134,6 @@ RAY_CONFIG(int, object_manager_repeated_push_delay_ms, 60000); /// chunks exceeds the number of available sending threads. RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 1000000); -/// Maximum size of an inline object (bytes). -/// Inline objects are objects whose data and metadata are inlined in the -/// GCS object table entry, which normally only specifies the object locations. -RAY_CONFIG(int64_t, inline_object_max_size_bytes, 512); - /// Number of workers per process RAY_CONFIG(int, num_workers_per_process, 1); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 39f980d66b7f..74d5c9dee974 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -42,12 +42,10 @@ namespace raylet { NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config, ObjectManager &object_manager, std::shared_ptr gcs_client, - std::shared_ptr object_directory, - plasma::PlasmaClient &store_client) + std::shared_ptr object_directory) : client_id_(gcs_client->client_table().GetLocalClientId()), io_service_(io_service), object_manager_(object_manager), - store_client_(store_client), gcs_client_(std::move(gcs_client)), object_directory_(std::move(object_directory)), heartbeat_timer_(io_service), @@ -92,6 +90,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, })); RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); + + RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); } ray::Status NodeManager::RegisterGcs() { @@ -1286,16 +1286,10 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { object_id, [this, task_marked_as_failed, task]( const ray::ObjectID &object_id, - const std::unordered_set &clients, bool inline_object_flag, - const std::vector &inline_object_data, - const std::string &inline_object_metadata, bool has_been_created) { + const std::unordered_set &clients, bool has_been_created) { if (!*task_marked_as_failed) { // Only process the object locations if we haven't already marked the // task as failed. - if (inline_object_flag) { - // If object is inlined, we already have its data and metadata, so return. - return; - } if (clients.empty() && has_been_created) { // The object does not exist on any nodes but has been created // before, so the object has been lost. Mark the task as failed to @@ -1957,7 +1951,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { // Notify the task dependency manager that this object is local. const auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(object_id); RAY_LOG(DEBUG) << "Object local " << object_id << ", " - << "on " << gcs_client_->client_table().GetLocalClientId() + << " on " << gcs_client_->client_table().GetLocalClientId() << ready_task_ids.size() << " tasks ready"; // Transition the tasks whose dependencies are now fulfilled to the ready state. if (ready_task_ids.size() > 0) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 47bc86e53dcc..061ef5ef8969 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -56,12 +56,10 @@ class NodeManager { /// /// \param resource_config The initial set of node resources. /// \param object_manager A reference to the local object manager. - /// \param reference to the local object store. NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config, ObjectManager &object_manager, std::shared_ptr gcs_client, - std::shared_ptr object_directory_, - plasma::PlasmaClient &store_client); + std::shared_ptr object_directory_); /// Process a new client connection. /// @@ -440,7 +438,7 @@ class NodeManager { /// A Plasma object store client. This is used exclusively for creating new /// objects in the object store (e.g., for actor tasks that can't be run /// because the actor died). - plasma::PlasmaClient &store_client_; + plasma::PlasmaClient store_client_; /// A client connection to the GCS. std::shared_ptr gcs_client_; /// The object table. This is shared with the object manager. diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index a600aa166a4d..288f0a80b481 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -41,10 +41,9 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ std::shared_ptr gcs_client) : gcs_client_(gcs_client), object_directory_(std::make_shared(main_service, gcs_client_)), - object_manager_(main_service, object_manager_config, object_directory_, - store_client_), + object_manager_(main_service, object_manager_config, object_directory_), node_manager_(main_service, node_manager_config, object_manager_, gcs_client_, - object_directory_, store_client_), + object_directory_), socket_name_(socket_name), acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)), socket_(main_service), @@ -57,8 +56,6 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ boost::asio::ip::tcp::v4(), node_manager_config.node_manager_port)), node_manager_socket_(main_service) { - RAY_ARROW_CHECK_OK( - store_client_.Connect(node_manager_config.store_socket_name.c_str(), "", 0, 300)); // Start listening for clients. DoAccept(); DoAcceptObjectManager(); diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 8f010ed512a6..84274ea6ecfe 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -73,10 +73,6 @@ class Raylet { /// The object table. This is shared between the object manager and node /// manager. std::shared_ptr object_directory_; - /// Reference to Plasma Store. - /// A connection to the Plasma Store. This is shared between the node manager - /// and the main thread of the object manager. - plasma::PlasmaClient store_client_; /// Manages client requests for object transfers and availability. ObjectManager object_manager_; /// Manages client requests for task submission and execution. diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index a98df6d493d3..d698402994a4 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -145,10 +145,8 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) { created_object_id, [this, task_id, reconstruction_attempt]( const ray::ObjectID &object_id, - const std::unordered_set &clients, bool inline_object_flag, - const std::vector &inline_object_data, - const std::string &inline_object_metadata, bool created) { - if (clients.empty() && !inline_object_flag) { + const std::unordered_set &clients, bool created) { + if (clients.empty()) { // The required object no longer exists on any live nodes. Attempt // reconstruction. AttemptReconstruction(task_id, object_id, reconstruction_attempt, created); diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 5d4756389e9a..5e9ae6d7e521 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -29,10 +29,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface { const ObjectID object_id = callback.first; auto it = locations_.find(object_id); if (it == locations_.end()) { - callback.second(object_id, std::unordered_set(), false, {}, "", + callback.second(object_id, std::unordered_set(), /*created=*/false); } else { - callback.second(object_id, it->second, false, {}, "", /*created=*/true); + callback.second(object_id, it->second, /*created=*/true); } } callbacks_.clear(); @@ -60,11 +60,9 @@ class MockObjectDirectory : public ObjectDirectoryInterface { const OnLocationsFound &)); MOCK_METHOD2(UnsubscribeObjectLocations, ray::Status(const ray::UniqueID &, const ObjectID &)); - MOCK_METHOD5(ReportObjectAdded, + MOCK_METHOD3(ReportObjectAdded, ray::Status(const ObjectID &, const ClientID &, - const object_manager::protocol::ObjectInfoT &, bool, - const plasma::ObjectBuffer &)); - + const object_manager::protocol::ObjectInfoT &)); MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &)); private: diff --git a/src/ray/test/run_object_manager_valgrind.sh b/src/ray/test/run_object_manager_valgrind.sh index 2f73abf9b4f9..12de9d118581 100644 --- a/src/ray/test/run_object_manager_valgrind.sh +++ b/src/ray/test/run_object_manager_valgrind.sh @@ -52,9 +52,6 @@ sleep 1s # in valgrind. $VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000 sleep 1s -# Run tests again with inlined objects. -$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000 true -sleep 1s $VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC $REDIS_DIR/redis-cli -p 6379 shutdown sleep 1s