From 260e75288a71f80c8cd6092a92beac1eb5cf50a3 Mon Sep 17 00:00:00 2001 From: Frank Osterfeld Date: Mon, 27 Nov 2023 12:47:34 +0100 Subject: [PATCH] Do not block worker threads forever if no event is received (#329) Make sure that long-poll request handlers do not block forever when no corresponding event is received. Otherwise the clients will send one request after another once their request times out client-side, until the worker threads are exhausted and the server stops responding. It would be great if we could also detect the client connection be dropped using Keep-Alive, but cpp-httplib doesn't seem to have API for that. --- .../include/majordomo/RestBackend.hpp | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/majordomo/include/majordomo/RestBackend.hpp b/src/majordomo/include/majordomo/RestBackend.hpp index b353984d..eaffaaab 100644 --- a/src/majordomo/include/majordomo/RestBackend.hpp +++ b/src/majordomo/include/majordomo/RestBackend.hpp @@ -61,9 +61,11 @@ using namespace std::chrono_literals; constexpr auto HTTP_OK = 200; constexpr auto HTTP_ERROR = 500; +constexpr auto HTTP_GATEWAY_TIMEOUT = 504; constexpr auto DEFAULT_REST_PORT = 8080; constexpr auto REST_POLLING_TIME = 10s; constexpr auto UPDATER_POLLING_TIME = 1s; +constexpr auto LONG_POLL_SERVER_TIMEOUT = 30s; constexpr auto UNUSED_SUBSCRIPTION_EXPIRATION_TIME = 30s; constexpr std::size_t MAX_CACHED_REPLIES = 32; @@ -154,8 +156,8 @@ std::string_view acceptedMimeForRequest(const auto &request) { return acceptableMimeTypes[0]; } -bool respondWithError(auto &response, std::string_view message) { - response.status = HTTP_ERROR; +bool respondWithError(auto &response, std::string_view message, int status = HTTP_ERROR) { + response.status = status; response.set_content(message.data(), MIME::TEXT.typeName().data()); return true; }; @@ -287,9 +289,18 @@ struct Connection { return ReadLock(_cachedRepliesMutex); } - void waitForUpdate() { + bool waitForUpdate(std::chrono::milliseconds timeout) { + // This could also periodically check for the client connection being dropped (e.g. due to client-side timeout) + // if cpp-httplib had API for that. auto temporaryLock = writeLock(); - _pollingIndexCV.wait(temporaryLock); + const auto next = _nextPollingIndex; + while (_nextPollingIndex == next) { + if (_pollingIndexCV.wait_for(temporaryLock, timeout) == std::cv_status::timeout) { + return false; + } + } + + return true; } std::size_t cachedRepliesSize(ReadLock & /*lock*/) const { @@ -465,6 +476,9 @@ class RestBackend : public Mode { virtual ~RestBackend() { _svr.stop(); + // shutdown thread before _connectionForService is destroyed + _connectionUpdaterThread.request_stop(); + _connectionUpdaterThread.join(); } auto handleServiceRequest(const httplib::Request &request, httplib::Response &response, const httplib::ContentReader *content_reader_ = nullptr) { @@ -737,7 +751,9 @@ struct RestBackend::RestWorker { [connection](std::size_t /*offset*/, httplib::DataSink &sink) mutable { std::cerr << "Chunked reply...\n"; - connection->waitForUpdate(); + if (!connection->waitForUpdate(LONG_POLL_SERVER_TIMEOUT)) { + return false; + } auto connectionCacheLock = connection->readLock(); auto lastIndex = connection->nextPollingIndex(connectionCacheLock) - 1; @@ -861,7 +877,9 @@ struct RestBackend::RestWorker { // Since we use KeepAlive object, the inital refCount can go away connection->decreaseReferenceCount(); - connection->waitForUpdate(); + if (!connection->waitForUpdate(LONG_POLL_SERVER_TIMEOUT)) { + return detail::respondWithError(response, "Timeout waiting for update", HTTP_GATEWAY_TIMEOUT); + } const auto newCache = fetchCache();