Skip to content

Commit

Permalink
Expose new LocalCluster interface and use in example
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed Jan 22, 2024
1 parent 0118731 commit b834275
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 136 deletions.
2 changes: 1 addition & 1 deletion build.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ int main(int argc, char **argv) {
char *CXX = strcpy(calloc(1024, 1), or_else(getenv("CXX"), "g++"));
char *EXEC_SUFFIX = strcpy(calloc(1024, 1), maybe(getenv("EXEC_SUFFIX")));

char *EXAMPLE_FILES[] = {"LoadBalancer", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName",
char *EXAMPLE_FILES[] = {"HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName",
"EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"};

strcat(CXXFLAGS, " -march=native -O3 -Wpedantic -Wall -Wextra -Wsign-conversion -Wconversion -std=c++20 -Isrc -IuSockets/src");
Expand Down
52 changes: 18 additions & 34 deletions examples/HelloWorldThreaded.cpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,24 @@
#include "App.h"
#include <thread>
#include <algorithm>
#include <mutex>

/* Note that SSL is disabled unless you build with WITH_OPENSSL=1 */
const int SSL = 1;
std::mutex stdoutMutex;
#include "LocalCluster.h"

int main() {
/* Overly simple hello world app, using multiple threads */
std::vector<std::thread *> threads(std::thread::hardware_concurrency());

std::transform(threads.begin(), threads.end(), threads.begin(), [](std::thread */*t*/) {
return new std::thread([]() {

uWS::SSLApp({
.key_file_name = "misc/key.pem",
.cert_file_name = "misc/cert.pem",
.passphrase = "1234"
}).get("/*", [](auto *res, auto * /*req*/) {
res->end("Hello world!");
}).listen(3000, [](auto *listen_socket) {
stdoutMutex.lock();
if (listen_socket) {
/* Note that us_listen_socket_t is castable to us_socket_t */
std::cout << "Thread " << std::this_thread::get_id() << " listening on port " << us_socket_local_port(SSL, (struct us_socket_t *) listen_socket) << std::endl;
} else {
std::cout << "Thread " << std::this_thread::get_id() << " failed to listen on port 3000" << std::endl;
}
stdoutMutex.unlock();
}).run();

/* Note that SSL is disabled unless you build with WITH_OPENSSL=1 */
uWS::LocalCluster({
.key_file_name = "misc/key.pem",
.cert_file_name = "misc/cert.pem",
.passphrase = "1234"
},
[](uWS::SSLApp &app) {
/* Here this App instance is defined */
app.get("/*", [](auto *res, auto * /*req*/) {
res->end("Hello world!");
}).listen(3000, [](auto *listen_socket) {
if (listen_socket) {
/* Note that us_listen_socket_t is castable to us_socket_t */
std::cout << "Thread " << std::this_thread::get_id() << " listening on port " << us_socket_local_port(true, (struct us_socket_t *) listen_socket) << std::endl;
} else {
std::cout << "Thread " << std::this_thread::get_id() << " failed to listen on port 3000" << std::endl;
}
});
});

std::for_each(threads.begin(), threads.end(), [](std::thread *t) {
t->join();
});
}
101 changes: 0 additions & 101 deletions examples/LoadBalancer.cpp

This file was deleted.

62 changes: 62 additions & 0 deletions src/LocalCluster.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* This header is highly experimental and needs refactorings but will do for now */

#include <thread>
#include <algorithm>
#include <mutex>

unsigned int roundRobin = 0;
unsigned int hardwareConcurrency = std::thread::hardware_concurrency();
std::vector<std::thread *> threads(hardwareConcurrency);
std::vector<uWS::SSLApp *> apps;
std::mutex m;

namespace uWS {
struct LocalCluster {

//std::vector<std::thread *> threads = std::thread::hardware_concurrency();
//std::vector<uWS::SSLApp *> apps;
//std::mutex m;


static void loadBalancer() {
static std::atomic<unsigned int> roundRobin = 0; // atomic fetch_add
}

LocalCluster(SocketContextOptions options = {}, std::function<void(uWS::SSLApp &)> cb = nullptr) {
std::transform(threads.begin(), threads.end(), threads.begin(), [options, &cb](std::thread *) {

return new std::thread([options, &cb]() {

// lock this
m.lock();
apps.emplace_back(new uWS::SSLApp(options));
uWS::SSLApp *app = apps.back();

cb(*app);

app->preOpen([](LIBUS_SOCKET_DESCRIPTOR fd) {

/* Distribute this socket in round robin fashion */
//std::cout << "About to load balance " << fd << " to " << roundRobin << std::endl;

auto receivingApp = apps[roundRobin];
apps[roundRobin]->getLoop()->defer([fd, receivingApp]() {
receivingApp->adoptSocket(fd);
});

roundRobin = (roundRobin + 1) % hardwareConcurrency;
return -1;
});
m.unlock();
app->run();
std::cout << "Fallthrough!" << std::endl;
delete app;
});
});

std::for_each(threads.begin(), threads.end(), [](std::thread *t) {
t->join();
});
}
};
}

0 comments on commit b834275

Please sign in to comment.