Skip to content

Commit

Permalink
Add draft for LocalCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed Jan 22, 2024
1 parent 1401f59 commit 0118731
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ int main(int argc, char **argv) {
char *CXX = strcpy(calloc(1024, 1), or_else(getenv("CXX"), "g++"));
char *EXEC_SUFFIX = strcpy(calloc(1024, 1), maybe(getenv("EXEC_SUFFIX")));

char *EXAMPLE_FILES[] = {"Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName",
char *EXAMPLE_FILES[] = {"LoadBalancer", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName",
"EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"};

strcat(CXXFLAGS, " -march=native -O3 -Wpedantic -Wall -Wextra -Wsign-conversion -Wconversion -std=c++20 -Isrc -IuSockets/src");
Expand Down
101 changes: 101 additions & 0 deletions examples/LoadBalancer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#include "App.h"
#include <thread>
#include <algorithm>
#include <mutex>

/* Note that SSL is disabled unless you build with WITH_OPENSSL=1 */
const int SSL = 1;

unsigned int roundRobin = 0;
unsigned int hardwareConcurrency = std::thread::hardware_concurrency();
std::vector<std::thread *> threads(hardwareConcurrency);
std::vector<uWS::SSLApp *> apps;
std::mutex m;

namespace uWS {
struct LocalCluster {

//std::vector<std::thread *> threads = std::thread::hardware_concurrency();
std::vector<uWS::SSLApp *> apps;
std::mutex m;


static void loadBalancer() {
static std::atomic<unsigned int> roundRobin = 0; // atomic fetch_add
}

LocalCluster(SocketContextOptions options = {}, std::function<void(uWS::SSLApp &)> cb = nullptr) {

}
};
}

int main() {

// can be strictly round robin or not

// uWS::LocalCluster({
// .key_file_name = "misc/key.pem",
// .cert_file_name = "misc/cert.pem",
// .passphrase = "1234"
// },
// [](uWS::SSLApp &app) {
// /* Here this App instance is defined */
// app.get("/*", [](auto *res, auto * /*req*/) {
// res->end("Hello world!");
// }).listen(3000, [](auto *listen_socket) {
// if (listen_socket) {
// /* Note that us_listen_socket_t is castable to us_socket_t */
// std::cout << "Thread " << std::this_thread::get_id() << " listening on port " << us_socket_local_port(SSL, (struct us_socket_t *) listen_socket) << std::endl;
// } else {
// std::cout << "Thread " << std::this_thread::get_id() << " failed to listen on port 3000" << std::endl;
// }
// });
// });

std::transform(threads.begin(), threads.end(), threads.begin(), [](std::thread *) {

return new std::thread([]() {

// lock this
m.lock();
apps.emplace_back(new uWS::SSLApp({
.key_file_name = "misc/key.pem",
.cert_file_name = "misc/cert.pem",
.passphrase = "1234"
}));
uWS::SSLApp *app = apps.back();

app->get("/*", [](auto *res, auto * /*req*/) {
res->end("Hello world!");
}).listen(3000, [](auto *listen_socket) {
if (listen_socket) {
/* Note that us_listen_socket_t is castable to us_socket_t */
std::cout << "Thread " << std::this_thread::get_id() << " listening on port " << us_socket_local_port(SSL, (struct us_socket_t *) listen_socket) << std::endl;
} else {
std::cout << "Thread " << std::this_thread::get_id() << " failed to listen on port 3000" << std::endl;
}
}).preOpen([](LIBUS_SOCKET_DESCRIPTOR fd) {

/* Distribute this socket in round robin fashion */
std::cout << "About to load balance " << fd << " to " << roundRobin << std::endl;

auto receivingApp = apps[roundRobin];
apps[roundRobin]->getLoop()->defer([fd, receivingApp]() {
receivingApp->adoptSocket(fd);
});

roundRobin = (roundRobin + 1) % hardwareConcurrency;
return -1;
});
m.unlock();
app->run();
std::cout << "Fallthrough!" << std::endl;
delete app;
});
});

std::for_each(threads.begin(), threads.end(), [](std::thread *t) {
t->join();
});
}
10 changes: 10 additions & 0 deletions src/App.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,12 @@ struct TemplatedApp {
return std::move(*this);
}

/* Register event handler for accepted FD. Can be used together with adoptSocket. */
TemplatedApp &&preOpen(LIBUS_SOCKET_DESCRIPTOR (*handler)(LIBUS_SOCKET_DESCRIPTOR)) {
httpContext->onPreOpen(handler);
return std::move(*this);
}

/* adopt an externally accepted socket */
TemplatedApp &&adoptSocket(LIBUS_SOCKET_DESCRIPTOR accepted_fd) {
httpContext->adoptAcceptedSocket(accepted_fd);
Expand All @@ -586,6 +592,10 @@ struct TemplatedApp {
return std::move(*this);
}

Loop *getLoop() {
return (Loop *) httpContext->getLoop();
}

};

typedef TemplatedApp<false> App;
Expand Down
8 changes: 8 additions & 0 deletions src/HttpContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ struct HttpContext {
/* Minimum allowed receive throughput per second (clients uploading less than 16kB/sec get dropped) */
static const int HTTP_RECEIVE_THROUGHPUT_BYTES = 16 * 1024;

us_loop_t *getLoop() {
return us_socket_context_loop(SSL, getSocketContext());
}

us_socket_context_t *getSocketContext() {
return (us_socket_context_t *) this;
}
Expand Down Expand Up @@ -483,6 +487,10 @@ struct HttpContext {
return us_socket_context_listen_unix(SSL, getSocketContext(), path, options, sizeof(HttpResponseData<SSL>));
}

void onPreOpen(LIBUS_SOCKET_DESCRIPTOR (*handler)(LIBUS_SOCKET_DESCRIPTOR)) {
us_socket_context_on_pre_open(SSL, getSocketContext(), handler);
}

/* Adopt an externally accepted socket into this HttpContext */
us_socket_t *adoptAcceptedSocket(LIBUS_SOCKET_DESCRIPTOR accepted_fd) {
return us_adopt_accepted_socket(SSL, getSocketContext(), accepted_fd, sizeof(HttpResponseData<SSL>), 0, 0);
Expand Down

0 comments on commit 0118731

Please sign in to comment.