Skip to content

Commit

Permalink
Do not block worker threads forever if no event is received (#329)
Browse files Browse the repository at this point in the history
Make sure that long-poll request handlers do not block forever when no
corresponding event is received. Otherwise the clients will send one
request after another once their request times out client-side, until
the worker threads are exhausted and the server stops responding.

It would be great if we could also detect the client connection be
dropped using Keep-Alive, but cpp-httplib doesn't seem to have API for
that.
  • Loading branch information
frankosterfeld committed Nov 29, 2023
1 parent c19531d commit 260e752
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions src/majordomo/include/majordomo/RestBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ using namespace std::chrono_literals;

constexpr auto HTTP_OK = 200;
constexpr auto HTTP_ERROR = 500;
constexpr auto HTTP_GATEWAY_TIMEOUT = 504;
constexpr auto DEFAULT_REST_PORT = 8080;
constexpr auto REST_POLLING_TIME = 10s;
constexpr auto UPDATER_POLLING_TIME = 1s;
constexpr auto LONG_POLL_SERVER_TIMEOUT = 30s;
constexpr auto UNUSED_SUBSCRIPTION_EXPIRATION_TIME = 30s;
constexpr std::size_t MAX_CACHED_REPLIES = 32;

Expand Down Expand Up @@ -154,8 +156,8 @@ std::string_view acceptedMimeForRequest(const auto &request) {
return acceptableMimeTypes[0];
}

bool respondWithError(auto &response, std::string_view message) {
response.status = HTTP_ERROR;
bool respondWithError(auto &response, std::string_view message, int status = HTTP_ERROR) {
response.status = status;

Check warning on line 160 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L159-L160

Added lines #L159 - L160 were not covered by tests
response.set_content(message.data(), MIME::TEXT.typeName().data());
return true;
};
Expand Down Expand Up @@ -287,9 +289,18 @@ struct Connection {
return ReadLock(_cachedRepliesMutex);
}

void waitForUpdate() {
bool waitForUpdate(std::chrono::milliseconds timeout) {

Check warning on line 292 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L292

Added line #L292 was not covered by tests
// This could also periodically check for the client connection being dropped (e.g. due to client-side timeout)
// if cpp-httplib had API for that.
auto temporaryLock = writeLock();
_pollingIndexCV.wait(temporaryLock);
const auto next = _nextPollingIndex;

Check warning on line 296 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L296

Added line #L296 was not covered by tests
while (_nextPollingIndex == next) {
if (_pollingIndexCV.wait_for(temporaryLock, timeout) == std::cv_status::timeout) {
return false;

Check warning on line 299 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L299

Added line #L299 was not covered by tests
}
}

return true;

Check warning on line 303 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L303

Added line #L303 was not covered by tests
}

std::size_t cachedRepliesSize(ReadLock & /*lock*/) const {
Expand Down Expand Up @@ -465,6 +476,9 @@ class RestBackend : public Mode {

virtual ~RestBackend() {
_svr.stop();
// shutdown thread before _connectionForService is destroyed
_connectionUpdaterThread.request_stop();
_connectionUpdaterThread.join();
}

auto handleServiceRequest(const httplib::Request &request, httplib::Response &response, const httplib::ContentReader *content_reader_ = nullptr) {
Expand Down Expand Up @@ -737,7 +751,9 @@ struct RestBackend<Mode, VirtualFS, Roles...>::RestWorker {
[connection](std::size_t /*offset*/, httplib::DataSink &sink) mutable {
std::cerr << "Chunked reply...\n";

connection->waitForUpdate();
if (!connection->waitForUpdate(LONG_POLL_SERVER_TIMEOUT)) {
return false;

Check warning on line 755 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L755

Added line #L755 was not covered by tests
}

auto connectionCacheLock = connection->readLock();
auto lastIndex = connection->nextPollingIndex(connectionCacheLock) - 1;
Expand Down Expand Up @@ -861,7 +877,9 @@ struct RestBackend<Mode, VirtualFS, Roles...>::RestWorker {
// Since we use KeepAlive object, the inital refCount can go away
connection->decreaseReferenceCount();

connection->waitForUpdate();
if (!connection->waitForUpdate(LONG_POLL_SERVER_TIMEOUT)) {
return detail::respondWithError(response, "Timeout waiting for update", HTTP_GATEWAY_TIMEOUT);
}

const auto newCache = fetchCache();

Expand Down

0 comments on commit 260e752

Please sign in to comment.