Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential deadlock when deleting queue #759

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/libclipper/include/clipper/rpc_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ class RPCService {

void shutdown_service(socket_t &socket);
std::thread rpc_thread_;
shared_ptr<moodycamel::ConcurrentQueue<RPCRequest>> request_queue_;
std::unique_ptr<moodycamel::ConcurrentQueue<RPCRequest>> request_queue_;
// Flag indicating whether rpc service is active
std::atomic_bool active_;
// The next available message id
int message_id_ = 0;
std::atomic_int message_id_{0};
std::chrono::system_clock::time_point last_activity_check_time_;
std::unordered_map<VersionedModelId, int> replica_ids_;
std::shared_ptr<metrics::Histogram> msg_queueing_hist_;
Expand Down
77 changes: 49 additions & 28 deletions src/libclipper/include/clipper/threadpool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,31 @@ class ThreadPool {
ss << thread_id;

try {
while (!done_ && queues_[worker_id].is_valid()) {
while (!done_) {
boost::this_thread::interruption_point();
std::unique_ptr<IThreadTask> pTask{nullptr};
bool work_to_do = false;
{
boost::shared_lock<boost::shared_mutex> l(queues_mutex_);
auto queue = queues_.find(worker_id);
if (queue == queues_.end()) {
log_error_formatted(LOGGING_TAG_THREADPOOL,
"Work queue does not exist for worker {}",
worker_id);
break;
}

if (!queue->second.is_valid()) {
break;
}

if (is_block_worker) {
work_to_do = queues_[worker_id].wait_pop(pTask, l);
work_to_do = queue->second.wait_pop(pTask, l);
} else {
// NOTE: The use of try_pop here means the worker will spin instead of
// block while waiting for work. This is intentional. We defer to the
// submitted tasks to block when no work is available.
work_to_do = queues_[worker_id].try_pop(pTask);
work_to_do = queue->second.try_pop(pTask);
}
}
if (work_to_do) {
Expand All @@ -237,11 +249,11 @@ class ThreadPool {
} catch (boost::thread_interrupted&) {
log_info_formatted(LOGGING_TAG_THREADPOOL,
"Worker {}, thread {} is interrupted",
std::to_string(worker_id), ss.str());
worker_id, ss.str());
}
log_info_formatted(LOGGING_TAG_THREADPOOL,
"Worker {}, thread {} is shutting down",
std::to_string(worker_id), ss.str());
worker_id, ss.str());
}

/**
Expand Down Expand Up @@ -280,22 +292,24 @@ class ModelQueueThreadPool : public ThreadPool {
try {
ThreadPool::submit(queue_id, func, args...);
} catch (ThreadPoolError& e) {
log_error_formatted(LOGGING_TAG_THREADPOOL,
"Failed to submit task for model {}, replica {}",
vm.serialize(), replica_id);
log_error_formatted(
LOGGING_TAG_THREADPOOL,
"Failed to submit task for model {}, replica {}: queue id {}",
vm.serialize(), replica_id, queue_id);
throw(e);
}
}

bool create_queue(VersionedModelId vm, int replica_id, bool is_block_worker) {
boost::unique_lock<boost::shared_mutex> lock_q(queues_mutex_);
boost::unique_lock<boost::shared_mutex> lock_t(threads_mutex_);
boost::unique_lock<boost::shared_mutex> lock_q(queues_mutex_);
size_t queue_id = get_queue_id(vm, replica_id);
auto queue = queues_.find(queue_id);
if (queue != queues_.end()) {
log_error_formatted(LOGGING_TAG_THREADPOOL,
"Work queue already exists for model {}, replica {}",
vm.serialize(), std::to_string(replica_id));
log_error_formatted(
LOGGING_TAG_THREADPOOL,
"Work queue already exists for model {}, replica {}: queue id {}",
vm.serialize(), replica_id, queue_id);
return false;
} else {
queues_.emplace(std::piecewise_construct, std::forward_as_tuple(queue_id),
Expand All @@ -304,9 +318,10 @@ class ModelQueueThreadPool : public ThreadPool {
std::forward_as_tuple(queue_id),
std::forward_as_tuple(&ModelQueueThreadPool::worker,
this, queue_id, is_block_worker));
log_info_formatted(LOGGING_TAG_THREADPOOL,
"Work queue created for model {}, replica {}",
vm.serialize(), std::to_string(replica_id));
log_info_formatted(
LOGGING_TAG_THREADPOOL,
"Work queue created for model {}, replica {}: queue id {}",
vm.serialize(), replica_id, queue_id);
return true;
}
}
Expand All @@ -316,28 +331,31 @@ class ModelQueueThreadPool : public ThreadPool {
size_t queue_id = get_queue_id(vm, replica_id);
auto thread = threads_.find(queue_id);
if (thread == threads_.end()) {
log_error_formatted(LOGGING_TAG_THREADPOOL,
"Thread does not exist for model {}, replica {}",
vm.serialize(), replica_id);
log_error_formatted(
LOGGING_TAG_THREADPOOL,
"Thread does not exist for model {}, replica {}: queue id {}",
vm.serialize(), replica_id, queue_id);
return false;
} else {
thread->second.interrupt();
log_info_formatted(LOGGING_TAG_THREADPOOL,
"Thread is interrupted for model {}, replica {}",
vm.serialize(), replica_id);
log_info_formatted(
LOGGING_TAG_THREADPOOL,
"Thread is interrupted for model {}, replica {}: queue id {}",
vm.serialize(), replica_id, queue_id);
return true;
}
}

bool delete_queue(VersionedModelId vm, const int replica_id) {
boost::unique_lock<boost::shared_mutex> lock_q(queues_mutex_);
boost::unique_lock<boost::shared_mutex> lock_t(threads_mutex_);
boost::upgrade_lock<boost::shared_mutex> lock_q_upgrade(queues_mutex_);
size_t queue_id = get_queue_id(vm, replica_id);
auto queue = queues_.find(queue_id);
if (queue == queues_.end() || !queue->second.is_valid()) {
log_error_formatted(LOGGING_TAG_THREADPOOL,
"Work queue does not exist for model {}, replica {}",
vm.serialize(), replica_id);
log_error_formatted(
LOGGING_TAG_THREADPOOL,
"Work queue does not exist for model {}, replica {}: queue id {}",
vm.serialize(), replica_id, queue_id);
return false;
}

Expand All @@ -348,12 +366,15 @@ class ModelQueueThreadPool : public ThreadPool {
thread->second.join();
}

boost::upgrade_to_unique_lock<boost::shared_mutex> lock_q_unique(
lock_q_upgrade);
queues_.erase(queue);
threads_.erase(thread);

log_info_formatted(LOGGING_TAG_THREADPOOL,
"Work queue destroyed for model {}, replica {}",
vm.serialize(), replica_id);
log_info_formatted(
LOGGING_TAG_THREADPOOL,
"Work queue destroyed for model {}, replica {}: queue_id {}",
vm.serialize(), replica_id, queue_id);
return true;
}

Expand Down
10 changes: 4 additions & 6 deletions src/libclipper/src/rpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void RPCDataStore::remove_data(void *data) {
}

RPCService::RPCService()
: request_queue_(std::make_shared<moodycamel::ConcurrentQueue<RPCRequest>>(
: request_queue_(std::make_unique<moodycamel::ConcurrentQueue<RPCRequest>>(
sizeof(RPCRequest) * 10000)),
active_(false),
last_activity_check_time_(std::chrono::system_clock::now()),
Expand Down Expand Up @@ -91,8 +91,7 @@ int RPCService::send_message(vector<ByteBuffer> msg,
"Dropping Message");
return -1;
}
int id = message_id_;
message_id_ += 1;
int id = message_id_.fetch_add(1);
long current_time_micros =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
Expand All @@ -117,8 +116,8 @@ void RPCService::manage_service(const string address) {
std::unordered_map<std::vector<uint8_t>, ConnectedContainerInfo,
std::function<size_t(const std::vector<uint8_t> &vec)>>
connections_containers_map(INITIAL_REPLICA_ID_SIZE, hash_vector<uint8_t>);
context_t context = context_t(1);
socket_t socket = socket_t(context, ZMQ_ROUTER);
context_t context(1);
socket_t socket(context, ZMQ_ROUTER);
socket.bind(address);
// Indicate that we will poll our zmq service socket for new inbound messages
zmq::pollitem_t items[] = {{socket, 0, ZMQ_POLLIN, 0}};
Expand Down Expand Up @@ -371,7 +370,6 @@ void RPCService::receive_message(
connection_id,
std::make_tuple(model, cur_replica_id, connected_time));

TaskExecutionThreadPool::create_queue(model, cur_replica_id);
zmq_connection_id += 1;
}
} break;
Expand Down