Skip to content

Commit

Permalink
Merge branch 'tickets/DM-45595'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Oct 4, 2024
2 parents ac67574 + 0ff2e7b commit 40a0184
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 140 deletions.
2 changes: 2 additions & 0 deletions src/replica/config/ConfigTestData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ map<string, set<string>> ConfigTestData::parameters() {
"async-loader-auto-resume",
"async-loader-cleanup-on-resume",
"http-max-listen-conn",
"http-max-queued-requests",
"svc-port",
"fs-port",
"data-dir",
Expand Down Expand Up @@ -124,6 +125,7 @@ json ConfigTestData::data() {
{"async-loader-auto-resume", 0},
{"async-loader-cleanup-on-resume", 0},
{"http-max-listen-conn", 512},
{"http-max-queued-requests", 1024},
{"svc-port", 51000},
{"fs-port", 52000},
{"data-dir", "/data"},
Expand Down
8 changes: 8 additions & 0 deletions src/replica/config/ConfigurationSchema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ json const ConfigurationSchema::_schemaJson = json::object(
"The maximum length of the queue of pending connections sent to the Replication worker's"
" HTTP-based ingest service. Must be greater than 0."},
{"default", max_listen_connections}}},
{"http-max-queued-requests",
{{"description",
"The maximum number of pending requests, i.e. requests accept()ed by"
" the listener but still waiting to be routed by the HTTP server."
" If set to 0 then no specific limit will be enforced. It's recommented to keep"
" the default value unless there are specific reasons to change it."},
{"empty-allowed", 1},
{"default", 0}}},
{"svc-port",
{{"description", "The port number for the worker's replication service."}, {"default", 25000}}},
{"fs-port",
Expand Down
29 changes: 12 additions & 17 deletions src/replica/ingest/IngestDataHttpSvcMod.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
#include "http/BinaryEncoding.h"
#include "http/Exceptions.h"
#include "http/Method.h"
#include "qhttp/Request.h"
#include "qhttp/Response.h"
#include "replica/config/Configuration.h"
#include "replica/services/DatabaseServices.h"
#include "replica/services/ServiceProvider.h"
#include "replica/util/Csv.h"
#include "util/String.h"

Expand All @@ -38,18 +37,14 @@
#include <stdexcept>
#include <vector>

// Third party headers
#include "httplib.h"

using namespace std;
using json = nlohmann::json;
namespace qhttp = lsst::qserv::qhttp;
namespace util = lsst::qserv::util;

namespace {
/// @return requestor's IP address
string senderIpAddr(shared_ptr<qhttp::Request> const& req) {
ostringstream ss;
ss << req->remoteAddr.address();
return ss.str();
}

/// These keywords are found in all known binary columns types of MySQL.
vector<string> const binColTypePatterns = {"BIT", "BINARY", "BLOB"};
Expand All @@ -70,18 +65,18 @@ bool isBinaryColumnType(string const& type) {

namespace lsst::qserv::replica {

void IngestDataHttpSvcMod::process(ServiceProvider::Ptr const& serviceProvider, string const& workerName,
shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp, string const& subModuleName,
void IngestDataHttpSvcMod::process(shared_ptr<ServiceProvider> const& serviceProvider,
string const& workerName, httplib::Request const& req,
httplib::Response& resp, string const& subModuleName,
http::AuthType const authType) {
IngestDataHttpSvcMod module(serviceProvider, workerName, req, resp);
module.execute(subModuleName, authType);
}

IngestDataHttpSvcMod::IngestDataHttpSvcMod(ServiceProvider::Ptr const& serviceProvider,
string const& workerName, shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp)
: http::QhttpModule(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp),
IngestDataHttpSvcMod::IngestDataHttpSvcMod(shared_ptr<ServiceProvider> const& serviceProvider,
string const& workerName, httplib::Request const& req,
httplib::Response& resp)
: http::ChttpModule(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp),
IngestFileSvc(serviceProvider, workerName) {}

string IngestDataHttpSvcMod::context() const { return "INGEST-DATA-HTTP-SVC "; }
Expand Down Expand Up @@ -112,7 +107,7 @@ json IngestDataHttpSvcMod::_syncProcessData() {
_contrib.worker = workerName();

// To indicate the JSON-formatted data were streamed directly into the service
_contrib.url = "data-json://" + ::senderIpAddr(req()) + "/";
_contrib.url = "data-json://" + req().remote_addr + "/";
_contrib.charsetName =
body().optional<string>("charset_name", config->get<string>("worker", "ingest-charset-name"));

Expand Down
25 changes: 11 additions & 14 deletions src/replica/ingest/IngestDataHttpSvcMod.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@
#define LSST_QSERV_INGESTDATAHTTPSVCMOD_H

// System headers
#include <memory>
#include <string>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "http/QhttpModule.h"
#include "http/ChttpModule.h"
#include "replica/ingest/IngestFileSvc.h"
#include "replica/ingest/TransactionContrib.h"
#include "replica/services/ServiceProvider.h"

// Forward declarations

namespace lsst::qserv::qhttp {
class Request;
class Response;
} // namespace lsst::qserv::qhttp
namespace lsst::qserv::replica {
class ServiceProvider;
} // namespace lsst::qserv::replica

// This header declarations
namespace lsst::qserv::replica {
Expand All @@ -49,7 +47,7 @@ namespace lsst::qserv::replica {
* Unlike class IngestHttpSvcMod, the current class is meant to be used for ingesting
* payloads that are pushed directly into the service over the HTTP protocol.
*/
class IngestDataHttpSvcMod : public http::QhttpModule, public IngestFileSvc {
class IngestDataHttpSvcMod : public http::ChttpModule, public IngestFileSvc {
public:
IngestDataHttpSvcMod() = delete;
IngestDataHttpSvcMod(IngestDataHttpSvcMod const&) = delete;
Expand All @@ -74,9 +72,9 @@ class IngestDataHttpSvcMod : public http::QhttpModule, public IngestFileSvc {
* @param authType The authorization requirements for the module
* @throws std::invalid_argument for unknown values of parameter 'subModuleName'
*/
static void process(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName,
std::shared_ptr<qhttp::Request> const& req,
std::shared_ptr<qhttp::Response> const& resp, std::string const& subModuleName,
static void process(std::shared_ptr<ServiceProvider> const& serviceProvider,
std::string const& workerName, httplib::Request const& req, httplib::Response& resp,
std::string const& subModuleName,
http::AuthType const authType = http::AuthType::REQUIRED);

protected:
Expand All @@ -88,9 +86,8 @@ class IngestDataHttpSvcMod : public http::QhttpModule, public IngestFileSvc {

private:
/// @see method IngestDataHttpSvcMod::create()
IngestDataHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName,
std::shared_ptr<qhttp::Request> const& req,
std::shared_ptr<qhttp::Response> const& resp);
IngestDataHttpSvcMod(std::shared_ptr<ServiceProvider> const& serviceProvider,
std::string const& workerName, httplib::Request const& req, httplib::Response& resp);

/// Process a table contribution request (SYNC).
nlohmann::json _syncProcessData();
Expand Down
127 changes: 60 additions & 67 deletions src/replica/ingest/IngestHttpSvc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@

// System headers
#include <functional>
#include <stdexcept>

// Qserv headers
#include "http/MetaModule.h"
#include "qhttp/Request.h"
#include "qhttp/Response.h"
#include "http/ChttpMetaModule.h"
#include "replica/config/Configuration.h"
#include "replica/ingest/IngestDataHttpSvcMod.h"
#include "replica/ingest/IngestHttpSvcMod.h"
#include "replica/ingest/IngestRequest.h"
#include "replica/ingest/IngestRequestMgr.h"
#include "replica/services/ServiceProvider.h"
#include "replica/util/Common.h"

// LSST headers
#include "lsst/log/Log.h"

// Third party headers
#include "boost/filesystem.hpp"
#include "httplib.h"
#include "nlohmann/json.hpp"

using namespace nlohmann;
Expand All @@ -51,76 +53,67 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.IngestHttpSvc");

namespace lsst::qserv::replica {

IngestHttpSvc::Ptr IngestHttpSvc::create(ServiceProvider::Ptr const& serviceProvider,
string const& workerName) {
return IngestHttpSvc::Ptr(new IngestHttpSvc(serviceProvider, workerName));
shared_ptr<IngestHttpSvc> IngestHttpSvc::create(shared_ptr<ServiceProvider> const& serviceProvider,
string const& workerName) {
return shared_ptr<IngestHttpSvc>(new IngestHttpSvc(serviceProvider, workerName));
}

IngestHttpSvc::IngestHttpSvc(ServiceProvider::Ptr const& serviceProvider, string const& workerName)
: HttpSvc(serviceProvider, serviceProvider->config()->get<uint16_t>("worker", "http-loader-port"),
serviceProvider->config()->get<unsigned int>("worker", "http-max-listen-conn"),
serviceProvider->config()->get<size_t>("worker", "num-http-loader-processing-threads")),
IngestHttpSvc::IngestHttpSvc(shared_ptr<ServiceProvider> const& serviceProvider, string const& workerName)
: ChttpSvc(context_, serviceProvider,
serviceProvider->config()->get<uint16_t>("worker", "http-loader-port"),
serviceProvider->config()->get<size_t>("worker", "http-max-queued-requests"),
serviceProvider->config()->get<size_t>("worker", "num-http-loader-processing-threads")),
_workerName(workerName),
_requestMgr(IngestRequestMgr::create(serviceProvider, workerName)),
_threads(serviceProvider->config()->get<size_t>("worker", "num-async-loader-processing-threads")) {}

string const& IngestHttpSvc::context() const { return context_; }

void IngestHttpSvc::registerServices() {
void IngestHttpSvc::registerServices(unique_ptr<httplib::Server> const& server) {
throwIf<logic_error>(server == nullptr, context_ + "the server is not initialized");
auto const self = shared_from_base<IngestHttpSvc>();
httpServer()->addHandlers(
{{"GET", "/meta/version",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
json const info = json::object({{"kind", "replication-worker-ingest"},
{"id", self->_workerName},
{"instance_id", self->serviceProvider()->instanceId()}});
http::MetaModule::process(::context_, info, req, resp, "VERSION");
}},
{"POST", "/ingest/data",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestDataHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp,
"SYNC-PROCESS-DATA");
}},
{"POST", "/ingest/file",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
req, resp, "SYNC-PROCESS");
}},
{"PUT", "/ingest/file/:id",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
req, resp, "SYNC-RETRY");
}},
{"POST", "/ingest/file-async",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
req, resp, "ASYNC-SUBMIT");
}},
{"PUT", "/ingest/file-async/:id",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
req, resp, "ASYNC-RETRY");
}},
{"GET", "/ingest/file-async/:id",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
req, resp, "ASYNC-STATUS-BY-ID", http::AuthType::NONE);
}},
{"DELETE", "/ingest/file-async/:id",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
req, resp, "ASYNC-CANCEL-BY-ID");
}},
{"GET", "/ingest/file-async/trans/:id",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
req, resp, "ASYNC-STATUS-BY-TRANS-ID", http::AuthType::NONE);
}},
{"DELETE", "/ingest/file-async/trans/:id",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
req, resp, "ASYNC-CANCEL-BY-TRANS-ID");
}}});
server->Get("/meta/version", [self](httplib::Request const& req, httplib::Response& resp) {
json const info = json::object({{"kind", "replication-worker-ingest"},
{"id", self->_workerName},
{"instance_id", self->serviceProvider()->instanceId()}});
http::ChttpMetaModule::process(context_, info, req, resp, "VERSION");
});

server->Post("/ingest/data", [self](httplib::Request const& req, httplib::Response& resp) {
IngestDataHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp,
"SYNC-PROCESS-DATA");
});
server->Post("/ingest/file", [self](httplib::Request const& req, httplib::Response& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp,
"SYNC-PROCESS");
});
server->Put("/ingest/file/:id", [self](httplib::Request const& req, httplib::Response& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp,
"SYNC-RETRY");
});
server->Post("/ingest/file-async", [self](httplib::Request const& req, httplib::Response& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp,
"ASYNC-SUBMIT");
});
server->Put("/ingest/file-async/:id", [self](httplib::Request const& req, httplib::Response& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp,
"ASYNC-RETRY");
});
server->Get("/ingest/file-async/:id", [self](httplib::Request const& req, httplib::Response& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp,
"ASYNC-STATUS-BY-ID", http::AuthType::NONE);
});
server->Delete("/ingest/file-async/:id", [self](httplib::Request const& req, httplib::Response& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp,
"ASYNC-CANCEL-BY-ID");
});
server->Get("/ingest/file-async/trans/:id", [self](httplib::Request const& req, httplib::Response& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp,
"ASYNC-STATUS-BY-TRANS-ID", http::AuthType::NONE);
});
server->Delete("/ingest/file-async/trans/:id",
[self](httplib::Request const& req, httplib::Response& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr,
self->_workerName, req, resp, "ASYNC-CANCEL-BY-TRANS-ID");
});

// Create the thread pool for processing asynchronous loading requests.
for (auto&& ptr : _threads) {
Expand Down
23 changes: 12 additions & 11 deletions src/replica/ingest/IngestHttpSvc.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@
#include <memory>
#include <string>
#include <thread>
#include <vector>

// Qserv headers
#include "replica/services/ServiceProvider.h"
#include "replica/util/HttpSvc.h"
#include "replica/util/ChttpSvc.h"

// Forward declarations
namespace lsst::qserv::replica {
class IngestRequestMgr;
class ServiceProvider;
} // namespace lsst::qserv::replica

namespace httplib {
class Server;
} // namespace httplib

// This header declarations
namespace lsst::qserv::replica {

Expand All @@ -47,10 +52,8 @@ namespace lsst::qserv::replica {
* service threads as configured in Configuration.
* @note The implementation of the class is not thread-safe.
*/
class IngestHttpSvc : public HttpSvc {
class IngestHttpSvc : public ChttpSvc {
public:
typedef std::shared_ptr<IngestHttpSvc> Ptr;

/**
* Create an instance of the service.
*
Expand All @@ -59,7 +62,8 @@ class IngestHttpSvc : public HttpSvc {
* checking consistency of the protocol).
* @return A pointer to the created object.
*/
static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName);
static std::shared_ptr<IngestHttpSvc> create(std::shared_ptr<ServiceProvider> const& serviceProvider,
std::string const& workerName);

IngestHttpSvc() = delete;
IngestHttpSvc(IngestHttpSvc const&) = delete;
Expand All @@ -68,15 +72,12 @@ class IngestHttpSvc : public HttpSvc {
virtual ~IngestHttpSvc() = default;

protected:
/// @see HttpSvc::context()
virtual std::string const& context() const;

/// @see HttpSvc::registerServices()
virtual void registerServices();
virtual void registerServices(std::unique_ptr<httplib::Server> const& server) override;

private:
/// @see IngestHttpSvc::create()
IngestHttpSvc(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName);
IngestHttpSvc(std::shared_ptr<ServiceProvider> const& serviceProvider, std::string const& workerName);

// Input parameters
std::string const _workerName;
Expand Down
Loading

0 comments on commit 40a0184

Please sign in to comment.