diff --git a/src/libclipper/include/clipper/rpc_service.hpp b/src/libclipper/include/clipper/rpc_service.hpp index 00fce67bd..0e4064524 100644 --- a/src/libclipper/include/clipper/rpc_service.hpp +++ b/src/libclipper/include/clipper/rpc_service.hpp @@ -145,11 +145,11 @@ class RPCService { void shutdown_service(socket_t &socket); std::thread rpc_thread_; - shared_ptr> request_queue_; + std::unique_ptr> request_queue_; // Flag indicating whether rpc service is active std::atomic_bool active_; // The next available message id - int message_id_ = 0; + std::atomic_int message_id_{0}; std::chrono::system_clock::time_point last_activity_check_time_; std::unordered_map replica_ids_; std::shared_ptr msg_queueing_hist_; diff --git a/src/libclipper/include/clipper/threadpool.hpp b/src/libclipper/include/clipper/threadpool.hpp index 50dcd07ac..a7c2caa45 100644 --- a/src/libclipper/include/clipper/threadpool.hpp +++ b/src/libclipper/include/clipper/threadpool.hpp @@ -215,19 +215,31 @@ class ThreadPool { ss << thread_id; try { - while (!done_ && queues_[worker_id].is_valid()) { + while (!done_) { boost::this_thread::interruption_point(); std::unique_ptr pTask{nullptr}; bool work_to_do = false; { boost::shared_lock l(queues_mutex_); + auto queue = queues_.find(worker_id); + if (queue == queues_.end()) { + log_error_formatted(LOGGING_TAG_THREADPOOL, + "Work queue does not exist for worker {}", + worker_id); + break; + } + + if (!queue->second.is_valid()) { + break; + } + if (is_block_worker) { - work_to_do = queues_[worker_id].wait_pop(pTask, l); + work_to_do = queue->second.wait_pop(pTask, l); } else { // NOTE: The use of try_pop here means the worker will spin instead of // block while waiting for work. This is intentional. We defer to the // submitted tasks to block when no work is available. - work_to_do = queues_[worker_id].try_pop(pTask); + work_to_do = queue->second.try_pop(pTask); } } if (work_to_do) { @@ -237,11 +249,11 @@ class ThreadPool { } catch (boost::thread_interrupted&) { log_info_formatted(LOGGING_TAG_THREADPOOL, "Worker {}, thread {} is interrupted", - std::to_string(worker_id), ss.str()); + worker_id, ss.str()); } log_info_formatted(LOGGING_TAG_THREADPOOL, "Worker {}, thread {} is shutting down", - std::to_string(worker_id), ss.str()); + worker_id, ss.str()); } /** @@ -280,22 +292,24 @@ class ModelQueueThreadPool : public ThreadPool { try { ThreadPool::submit(queue_id, func, args...); } catch (ThreadPoolError& e) { - log_error_formatted(LOGGING_TAG_THREADPOOL, - "Failed to submit task for model {}, replica {}", - vm.serialize(), replica_id); + log_error_formatted( + LOGGING_TAG_THREADPOOL, + "Failed to submit task for model {}, replica {}: queue id {}", + vm.serialize(), replica_id, queue_id); throw(e); } } bool create_queue(VersionedModelId vm, int replica_id, bool is_block_worker) { - boost::unique_lock lock_q(queues_mutex_); boost::unique_lock lock_t(threads_mutex_); + boost::unique_lock lock_q(queues_mutex_); size_t queue_id = get_queue_id(vm, replica_id); auto queue = queues_.find(queue_id); if (queue != queues_.end()) { - log_error_formatted(LOGGING_TAG_THREADPOOL, - "Work queue already exists for model {}, replica {}", - vm.serialize(), std::to_string(replica_id)); + log_error_formatted( + LOGGING_TAG_THREADPOOL, + "Work queue already exists for model {}, replica {}: queue id {}", + vm.serialize(), replica_id, queue_id); return false; } else { queues_.emplace(std::piecewise_construct, std::forward_as_tuple(queue_id), @@ -304,9 +318,10 @@ class ModelQueueThreadPool : public ThreadPool { std::forward_as_tuple(queue_id), std::forward_as_tuple(&ModelQueueThreadPool::worker, this, queue_id, is_block_worker)); - log_info_formatted(LOGGING_TAG_THREADPOOL, - "Work queue created for model {}, replica {}", - vm.serialize(), std::to_string(replica_id)); + log_info_formatted( + LOGGING_TAG_THREADPOOL, + "Work queue created for model {}, replica {}: queue id {}", + vm.serialize(), replica_id, queue_id); return true; } } @@ -316,28 +331,31 @@ class ModelQueueThreadPool : public ThreadPool { size_t queue_id = get_queue_id(vm, replica_id); auto thread = threads_.find(queue_id); if (thread == threads_.end()) { - log_error_formatted(LOGGING_TAG_THREADPOOL, - "Thread does not exist for model {}, replica {}", - vm.serialize(), replica_id); + log_error_formatted( + LOGGING_TAG_THREADPOOL, + "Thread does not exist for model {}, replica {}: queue id {}", + vm.serialize(), replica_id, queue_id); return false; } else { thread->second.interrupt(); - log_info_formatted(LOGGING_TAG_THREADPOOL, - "Thread is interrupted for model {}, replica {}", - vm.serialize(), replica_id); + log_info_formatted( + LOGGING_TAG_THREADPOOL, + "Thread is interrupted for model {}, replica {}: queue id {}", + vm.serialize(), replica_id, queue_id); return true; } } bool delete_queue(VersionedModelId vm, const int replica_id) { - boost::unique_lock lock_q(queues_mutex_); boost::unique_lock lock_t(threads_mutex_); + boost::upgrade_lock lock_q_upgrade(queues_mutex_); size_t queue_id = get_queue_id(vm, replica_id); auto queue = queues_.find(queue_id); if (queue == queues_.end() || !queue->second.is_valid()) { - log_error_formatted(LOGGING_TAG_THREADPOOL, - "Work queue does not exist for model {}, replica {}", - vm.serialize(), replica_id); + log_error_formatted( + LOGGING_TAG_THREADPOOL, + "Work queue does not exist for model {}, replica {}: queue id {}", + vm.serialize(), replica_id, queue_id); return false; } @@ -348,12 +366,15 @@ class ModelQueueThreadPool : public ThreadPool { thread->second.join(); } + boost::upgrade_to_unique_lock lock_q_unique( + lock_q_upgrade); queues_.erase(queue); threads_.erase(thread); - log_info_formatted(LOGGING_TAG_THREADPOOL, - "Work queue destroyed for model {}, replica {}", - vm.serialize(), replica_id); + log_info_formatted( + LOGGING_TAG_THREADPOOL, + "Work queue destroyed for model {}, replica {}: queue_id {}", + vm.serialize(), replica_id, queue_id); return true; } diff --git a/src/libclipper/src/rpc_service.cpp b/src/libclipper/src/rpc_service.cpp index 566752935..5256b13b7 100644 --- a/src/libclipper/src/rpc_service.cpp +++ b/src/libclipper/src/rpc_service.cpp @@ -42,7 +42,7 @@ void RPCDataStore::remove_data(void *data) { } RPCService::RPCService() - : request_queue_(std::make_shared>( + : request_queue_(std::make_unique>( sizeof(RPCRequest) * 10000)), active_(false), last_activity_check_time_(std::chrono::system_clock::now()), @@ -91,8 +91,7 @@ int RPCService::send_message(vector msg, "Dropping Message"); return -1; } - int id = message_id_; - message_id_ += 1; + int id = message_id_.fetch_add(1); long current_time_micros = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) @@ -117,8 +116,8 @@ void RPCService::manage_service(const string address) { std::unordered_map, ConnectedContainerInfo, std::function &vec)>> connections_containers_map(INITIAL_REPLICA_ID_SIZE, hash_vector); - context_t context = context_t(1); - socket_t socket = socket_t(context, ZMQ_ROUTER); + context_t context(1); + socket_t socket(context, ZMQ_ROUTER); socket.bind(address); // Indicate that we will poll our zmq service socket for new inbound messages zmq::pollitem_t items[] = {{socket, 0, ZMQ_POLLIN, 0}}; @@ -371,7 +370,6 @@ void RPCService::receive_message( connection_id, std::make_tuple(model, cur_replica_id, connected_time)); - TaskExecutionThreadPool::create_queue(model, cur_replica_id); zmq_connection_id += 1; } } break;