From 7b1fefd263be0fd452620af8154c69a8ee387b47 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 16 Sep 2024 18:44:46 -0700 Subject: [PATCH 1/7] Incremented the version number of the REST API --- src/admin/python/lsst/qserv/admin/replicationInterface.py | 2 +- src/http/ChttpMetaModule.cc | 2 +- src/http/MetaModule.cc | 2 +- src/www/qserv/js/Common.js | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/admin/python/lsst/qserv/admin/replicationInterface.py b/src/admin/python/lsst/qserv/admin/replicationInterface.py index 02e92cff1..a536d9266 100644 --- a/src/admin/python/lsst/qserv/admin/replicationInterface.py +++ b/src/admin/python/lsst/qserv/admin/replicationInterface.py @@ -201,7 +201,7 @@ def __init__( self.repl_ctrl = urlparse(repl_ctrl_uri) self.auth_key = auth_key self.admin_auth_key = admin_auth_key - self.repl_api_version = 37 + self.repl_api_version = 38 _log.debug(f"ReplicationInterface %s", self.repl_ctrl) def version(self) -> str: diff --git a/src/http/ChttpMetaModule.cc b/src/http/ChttpMetaModule.cc index 2794a21e0..5a592b2c6 100644 --- a/src/http/ChttpMetaModule.cc +++ b/src/http/ChttpMetaModule.cc @@ -37,7 +37,7 @@ string const adminAuthKey; namespace lsst::qserv::http { -unsigned int const ChttpMetaModule::version = 37; +unsigned int const ChttpMetaModule::version = 38; void ChttpMetaModule::process(string const& context, nlohmann::json const& info, httplib::Request const& req, httplib::Response& resp, string const& subModuleName) { diff --git a/src/http/MetaModule.cc b/src/http/MetaModule.cc index 194f79b39..f9f0be36c 100644 --- a/src/http/MetaModule.cc +++ b/src/http/MetaModule.cc @@ -37,7 +37,7 @@ string const adminAuthKey; namespace lsst::qserv::http { -unsigned int const MetaModule::version = 37; +unsigned int const MetaModule::version = 38; void MetaModule::process(string const& context, nlohmann::json const& info, shared_ptr const& req, shared_ptr const& resp, diff --git a/src/www/qserv/js/Common.js b/src/www/qserv/js/Common.js index 2d6240833..70c1d7c0f 100644 --- a/src/www/qserv/js/Common.js +++ b/src/www/qserv/js/Common.js @@ -6,7 +6,7 @@ function(sqlFormatter, _) { class Common { - static RestAPIVersion = 37; + static RestAPIVersion = 38; static query2text(query, expanded) { if (expanded) { if (query.length > Common._max_expanded_length) { From 015541d2193a716cb6db5be3bf627a5f575f8083 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 16 Sep 2024 18:42:05 -0700 Subject: [PATCH 2/7] Extended Url scheme --- src/http/Url.cc | 33 ++++++++++++++++++++++++++++----- src/http/Url.h | 2 +- src/http/testUrl.cc | 14 +++++++++++--- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/http/Url.cc b/src/http/Url.cc index cfbdae7b3..8ca5b813c 100644 --- a/src/http/Url.cc +++ b/src/http/Url.cc @@ -36,27 +36,27 @@ namespace lsst::qserv::http { Url::Url(string const& url) : _url(url) { _translate(); } string const& Url::fileHost() const { - if ((_scheme == DATA_JSON) || (_scheme == FILE)) return _fileHost; + if ((_scheme == DATA_JSON) || (_scheme == DATA_CSV) || (_scheme == FILE)) return _fileHost; throw logic_error(_error(__func__, "not a file resource.")); } string const& Url::filePath() const { - if ((_scheme == DATA_JSON) || (_scheme == FILE)) return _filePath; + if ((_scheme == DATA_JSON) || (_scheme == DATA_CSV) || (_scheme == FILE)) return _filePath; throw logic_error(_error(__func__, "not a file resource.")); } string const& Url::host() const { - if ((_scheme != DATA_JSON) && (_scheme != FILE)) return _host; + if ((_scheme == HTTP) || (_scheme == HTTPS)) return _host; throw logic_error(_error(__func__, "not an HTTP/HTTPS resource.")); } uint16_t Url::port() const { - if ((_scheme != DATA_JSON) && (_scheme != FILE)) return _port; + if ((_scheme == HTTP) || (_scheme == HTTPS)) return _port; throw logic_error(_error(__func__, "not an HTTP/HTTPS resource.")); } string const& Url::target() const { - if ((_scheme != DATA_JSON) && (_scheme != FILE)) return _target; + if ((_scheme == HTTP) || (_scheme == HTTPS)) return _target; throw logic_error(_error(__func__, "not an HTTP/HTTPS resource.")); } @@ -66,6 +66,7 @@ void Url::_translate() { if (_url.empty()) throw invalid_argument(_error(__func__, "url is empty.")); static map const schemes = {{"data-json://", Scheme::DATA_JSON}, + {"data-csv://", Scheme::DATA_CSV}, {"file://", Scheme::FILE}, {"http://", Scheme::HTTP}, {"https://", Scheme::HTTPS}}; @@ -84,6 +85,28 @@ void Url::_translate() { return; } } + } else if (Scheme::DATA_CSV == scheme) { + // This scheme assumes the following format: "data-csv:///[]" + string const hostFilePath = _url.substr(prefix.length()); + string::size_type const pos = hostFilePath.find_first_of('/'); + if (pos != string::npos) { + if (pos == 0) { + // This URL doesn't have the host name: data-csv:/// + if (hostFilePath.length() > 1) { + _scheme = scheme; + _filePath = hostFilePath; + return; + } + } else { + // This URL has the host name: file:/// + if (hostFilePath.length() > pos + 1) { + _scheme = scheme; + _fileHost = hostFilePath.substr(0, pos); + _filePath = hostFilePath.substr(pos); + return; + } + } + } } else if (Scheme::FILE == scheme) { // Note that the file path should be always absolute in the URL. It's impossible to // pass a relative location of a file in this scheme. The file path is required to diff --git a/src/http/Url.h b/src/http/Url.h index 339a84aff..a4c36bba6 100644 --- a/src/http/Url.h +++ b/src/http/Url.h @@ -34,7 +34,7 @@ namespace lsst::qserv::http { class Url { public: /// Types of resources - enum Scheme { DATA_JSON, FILE, HTTP, HTTPS }; + enum Scheme { DATA_JSON, DATA_CSV, FILE, HTTP, HTTPS }; // Default construction is prohibited to avoid extra complexity in managing // a "valid" state of the resource object. diff --git a/src/http/testUrl.cc b/src/http/testUrl.cc index 23d59a4d1..803a984df 100644 --- a/src/http/testUrl.cc +++ b/src/http/testUrl.cc @@ -54,13 +54,21 @@ BOOST_AUTO_TEST_CASE(UrlTest) { BOOST_CHECK_THROW({ ptr.reset(new Url("data-json://h/f")); }, invalid_argument); // The well-formed URL - string dataUrl = "data-json://h/"; - BOOST_REQUIRE_NO_THROW({ ptr.reset(new Url(dataUrl)); }); - BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->url(), dataUrl); }); + string dataJsonUrl = "data-json://h/"; + BOOST_REQUIRE_NO_THROW({ ptr.reset(new Url(dataJsonUrl)); }); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->url(), dataJsonUrl); }); BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->scheme(), Url::Scheme::DATA_JSON); }); BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->fileHost(), "h"); }); BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->filePath(), string()); }); + // The well-formed URL + string dataCsvUrl = "data-csv://h/f"; + BOOST_REQUIRE_NO_THROW({ ptr.reset(new Url(dataCsvUrl)); }); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->url(), dataCsvUrl); }); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->scheme(), Url::Scheme::DATA_CSV); }); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->fileHost(), "h"); }); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK_EQUAL(ptr->filePath(), "/f"); }); + // Resources which are too short to include anyting by the name of a scheme // aren't allowed. BOOST_CHECK_THROW({ ptr.reset(new Url("file:///")); }, invalid_argument); From b751ebe3f507d925894c7c316338c9128d946a77 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Tue, 17 Sep 2024 17:29:19 -0700 Subject: [PATCH 3/7] Reinforced API for retrieving parameters of a request from the request body Added a unit test for testing the request body class http::RequestBodyJSON. --- src/http/CMakeLists.txt | 1 + src/http/RequestBodyJSON.cc | 59 +++++++++++++++++++++++ src/http/RequestBodyJSON.h | 21 +++++++++ src/http/testRequestBodyJSON.cc | 84 +++++++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+) create mode 100644 src/http/testRequestBodyJSON.cc diff --git a/src/http/CMakeLists.txt b/src/http/CMakeLists.txt index 800a7be83..2bf9e2bbf 100644 --- a/src/http/CMakeLists.txt +++ b/src/http/CMakeLists.txt @@ -47,6 +47,7 @@ endfunction() http_tests( testAsyncReq + testRequestBodyJSON testRequestQuery testUrl ) diff --git a/src/http/RequestBodyJSON.cc b/src/http/RequestBodyJSON.cc index 7e52a9d68..5c79f609c 100644 --- a/src/http/RequestBodyJSON.cc +++ b/src/http/RequestBodyJSON.cc @@ -22,6 +22,9 @@ // Class header #include "http/RequestBodyJSON.h" +// Qserv headers +#include "global/stringUtil.h" + using namespace std; using json = nlohmann::json; @@ -37,4 +40,60 @@ bool RequestBodyJSON::has(json const& obj, string const& name) const { bool RequestBodyJSON::has(string const& name) const { return has(objJson, name); } +unsigned int RequestBodyJSON::requiredUInt(string const& name) const { + string const context = "RequestBodyJSON::" + string(__func__) + " "; + json const value = _get(__func__, name); + if (value.is_number_unsigned()) { + return value; + } else if (value.is_number_integer()) { + int const ret = value; + if (ret >= 0) return ret; + throw invalid_argument(context + "a value of the required parameter " + name + + " is a negative integer"); + } else if (value.is_string()) { + string const str = value; + try { + return qserv::stoui(str); + } catch (exception const& ex) { + ; + } + } + throw invalid_argument(context + "a value of the required parameter " + name + + " is not an unsigned integer"); +} + +unsigned int RequestBodyJSON::optionalUInt(string const& name, unsigned int defaultValue) const { + if (!has(name)) return defaultValue; + return requiredUInt(name); +} + +int RequestBodyJSON::requiredInt(string const& name) const { + json const value = _get(__func__, name); + if (value.is_number_integer()) { + return value; + } else if (value.is_string()) { + string const str = value; + try { + return stoi(str); + } catch (exception const& ex) { + ; + } + } + throw invalid_argument("RequestBodyJSON::" + string(__func__) + " a value of the required parameter " + + name + " is not a signed integer"); +} + +int RequestBodyJSON::optionalInt(string const& name, int defaultValue) const { + if (!has(name)) return defaultValue; + return requiredInt(name); +} + +json RequestBodyJSON::_get(string const& func, string const& name) const { + if (!has(name)) { + throw invalid_argument("RequestBodyJSON::" + func + " required parameter " + name + + " is missing in the request body"); + } + return objJson.at(name); +} + } // namespace lsst::qserv::http \ No newline at end of file diff --git a/src/http/RequestBodyJSON.h b/src/http/RequestBodyJSON.h index 896250d32..6bf6a8068 100644 --- a/src/http/RequestBodyJSON.h +++ b/src/http/RequestBodyJSON.h @@ -90,6 +90,18 @@ class RequestBodyJSON { return required(objJson, name); } + // The following methods are used to extract the values of the parameters from the JSON object + // where they could be stored as a string or as a number. The methods will try to convert the + // value to the desired type if it's a string. + // The methods will throw an exception if the parameter wasn't found, or if its value + // is not an integer. + + unsigned int requiredUInt(std::string const& name) const; + unsigned int optionalUInt(std::string const& name, unsigned int defaultValue = 0) const; + + int requiredInt(std::string const& name) const; + int optionalInt(std::string const& name, int defaultValue = 0) const; + /** * Return a value of a required parameter. Also ensure that the value is permitted. * @param name The name of a parameter. @@ -184,6 +196,15 @@ class RequestBodyJSON { return permitted.empty() or std::find(permitted.cbegin(), permitted.cend(), value) != permitted.cend(); } + + /** + * The helper method for finding and returning a value of a required parameter. + * @param func The name of the calling context. + * @param name The name of a parameter. + * @return A value of the parameter. + * @throw std::invalid_argument If the parameter wasn't found. + */ + nlohmann::json _get(std::string const& func, std::string const& name) const; }; } // namespace lsst::qserv::http diff --git a/src/http/testRequestBodyJSON.cc b/src/http/testRequestBodyJSON.cc new file mode 100644 index 000000000..c1dd5d1b9 --- /dev/null +++ b/src/http/testRequestBodyJSON.cc @@ -0,0 +1,84 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// System headers +#include +#include +#include +#include +#include + +// Qserv headers +#include "http/RequestBodyJSON.h" + +// Boost unit test header +#define BOOST_TEST_MODULE RequestBodyJSON +#include + +using namespace std; +namespace test = boost::test_tools; +using namespace lsst::qserv::http; + +BOOST_AUTO_TEST_SUITE(Suite) + +BOOST_AUTO_TEST_CASE(RequestBodyJSONTest) { + RequestBodyJSON body; + BOOST_CHECK(body.objJson.empty()); + + body.objJson["key1"] = "value1"; + body.objJson["key2"] = 2; + body.objJson["key3"] = -3; + body.objJson["key4"] = 4.0f; + body.objJson["key5"] = "5"; + body.objJson["key6"] = "-6"; + + for (auto const& [key, value] : body.objJson.items()) { + BOOST_REQUIRE_NO_THROW(BOOST_CHECK(body.has(key))); + } + + // Test if required parameters are handled correctly. + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.required("key1"), "value1")); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.required("key2"), 2U)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.required("key2"), 2)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.required("key3"), -3)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.required("key4"), 4.0f)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.requiredUInt("key2"), 2U)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.requiredInt("key2"), 2)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.requiredInt("key3"), -3)); + + // Test if missing parameters are handled correctly. + string const missingKey = "missing_key"; + BOOST_REQUIRE_NO_THROW(BOOST_CHECK(!body.has("missingKey"))); + BOOST_CHECK_THROW(body.required("missingKey"), invalid_argument); + BOOST_CHECK_THROW(body.requiredUInt("missingKey"), invalid_argument); + BOOST_CHECK_THROW(body.requiredInt("missingKey"), invalid_argument); + + // Test if optional parameters are handled correctly. + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.optional("missingKey", string()), string())); + BOOST_REQUIRE_NO_THROW( + BOOST_CHECK_EQUAL(body.optional("missingKey", "default"), string("default"))); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.optionalUInt("missingKey"), 0U)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.optionalUInt("missingKey", 1), 1U)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.optionalInt("missingKey", 0), 0)); + BOOST_REQUIRE_NO_THROW(BOOST_CHECK_EQUAL(body.optionalInt("missingKey", 2), 2)); +} + +BOOST_AUTO_TEST_SUITE_END() From dab5eeea9c2dedf761cb5ad55bc6dc549d968257 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Sun, 15 Sep 2024 13:37:12 -0700 Subject: [PATCH 4/7] Refactoring in the base classes of the Qserv REST services Class http::Module was split into two modules: http::BaseModule and a reduced version of http::Module. The new hierarchy is meant to prepare ground for introducing an intermediate base class that will support data streaming (and file uploading requests) in which the request body can't be read at once. The new class will inherit from http::BaseModule. --- src/http/BaseModule.cc | 160 ++++++++++++++++++++++++++ src/http/BaseModule.h | 242 ++++++++++++++++++++++++++++++++++++++++ src/http/CMakeLists.txt | 1 + src/http/Module.cc | 141 ++--------------------- src/http/Module.h | 180 ++---------------------------- 5 files changed, 420 insertions(+), 304 deletions(-) create mode 100644 src/http/BaseModule.cc create mode 100644 src/http/BaseModule.h diff --git a/src/http/BaseModule.cc b/src/http/BaseModule.cc new file mode 100644 index 000000000..d87762d12 --- /dev/null +++ b/src/http/BaseModule.cc @@ -0,0 +1,160 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "http/BaseModule.h" + +// Qserv headers +#include "http/Exceptions.h" +#include "http/MetaModule.h" +#include "http/RequestQuery.h" + +// LSST headers +#include "lsst/log/Log.h" + +// System headers +#include + +using namespace std; +using json = nlohmann::json; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.http.BaseModule"); + +string packWarnings(list const& warnings) { + string packed; + for (auto const& msg : warnings) { + if (!packed.empty()) packed += "; "; + packed += msg; + } + return packed; +} +} // namespace + +namespace lsst::qserv::http { + +BaseModule::BaseModule(string const& authKey, string const& adminAuthKey) + : _authKey(authKey), _adminAuthKey(adminAuthKey) {} + +void BaseModule::checkApiVersion(string const& func, unsigned int minVersion, string const& warning) const { + unsigned int const maxVersion = MetaModule::version; + unsigned int version = 0; + string const versionAttrName = "version"; + json const errorEx = json::object({{"min_version", minVersion}, {"max_version", maxVersion}}); + + // Intercept exceptions thrown when converting the attribute's value (if provided) + // in order to inject the allowed range of the version numbers into the extended + // error sent back to the caller. + // + // Note that requests sent w/o explicitly specified API version will still be + // processed. In this case a warning will be sent in the response object. + try { + if (method() == "GET") { + if (!query().has(versionAttrName)) { + warn("No version number was provided in the request's query."); + return; + } + version = query().requiredUInt(versionAttrName); + } else { + if (!body().has(versionAttrName)) { + warn("No version number was provided in the request's body."); + return; + } + version = body().requiredUInt(versionAttrName); + } + } catch (...) { + throw http::Error(func, "The required parameter " + versionAttrName + " is not a number.", errorEx); + } + if (!(minVersion <= version && version <= maxVersion)) { + if (!warning.empty()) warn(warning); + throw http::Error(func, + "The requested version " + to_string(version) + + " of the API is not in the range supported by the service.", + errorEx); + } +} + +void BaseModule::enforceInstanceId(string const& func, string const& requiredInstanceId) const { + string const instanceId = method() == "GET" ? query().requiredString("instance_id") + : body().required("instance_id"); + debug(func, "instance_id: " + instanceId); + if (instanceId != requiredInstanceId) { + throw invalid_argument(context() + func + " Qserv instance identifier mismatch. Client sent '" + + instanceId + "' instead of '" + requiredInstanceId + "'."); + } +} + +void BaseModule::info(string const& msg) const { LOGS(_log, LOG_LVL_INFO, context() << msg); } + +void BaseModule::debug(string const& msg) const { LOGS(_log, LOG_LVL_DEBUG, context() << msg); } + +void BaseModule::warn(string const& msg) const { + LOGS(_log, LOG_LVL_WARN, context() << msg); + _warnings.push_back(msg); +} + +void BaseModule::error(string const& msg) const { LOGS(_log, LOG_LVL_ERROR, context() << msg); } + +void BaseModule::sendError(string const& func, string const& errorMsg, json const& errorExt) { + error(func, errorMsg); + json result; + result["success"] = 0; + result["error"] = errorMsg; + result["error_ext"] = errorExt.is_null() ? json::object() : errorExt; + result["warning"] = ::packWarnings(_warnings); + sendResponse(result.dump(), "application/json"); +} + +void BaseModule::sendData(json& result) { + result["success"] = 1; + result["error"] = ""; + result["error_ext"] = json::object(); + result["warning"] = ::packWarnings(_warnings); + sendResponse(result.dump(), "application/json"); +} + +void BaseModule::enforceAuthorization(http::AuthType const authType) { + if (authType != http::AuthType::REQUIRED) return; + if (body().has("admin_auth_key")) { + auto const adminAuthKey = body().required("admin_auth_key"); + if (adminAuthKey != _adminAuthKey) { + throw AuthError(context() + + "administrator's authorization key 'admin_auth_key' in the request" + " doesn't match the one in server configuration"); + } + _isAdmin = true; + return; + } + if (body().has("auth_key")) { + auto const authKey = body().required("auth_key"); + if (authKey != _authKey) { + throw AuthError(context() + + "authorization key 'auth_key' in the request doesn't match" + " the one in server configuration"); + } + return; + } + throw AuthError(context() + + "none of the authorization keys 'auth_key' or 'admin_auth_key' was found" + " in the request. Please, provide one."); +} + +} // namespace lsst::qserv::http diff --git a/src/http/BaseModule.h b/src/http/BaseModule.h new file mode 100644 index 000000000..21e1b75ad --- /dev/null +++ b/src/http/BaseModule.h @@ -0,0 +1,242 @@ + +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_HTTP_BASEMODULE_H +#define LSST_QSERV_HTTP_BASEMODULE_H + +// System headers +#include +#include +#include +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "http/RequestBodyJSON.h" + +// Forward declarations +namespace lsst::qserv::http { +class RequestQuery; +} // namespace lsst::qserv::http + +// This header declarations +namespace lsst::qserv::http { + +/// The enumeration type which is used for configuring/enforcing +/// module's authorization requirements. +enum class AuthType { REQUIRED, NONE }; + +/// Class AuthError represent exceptions thrown when the authorization +/// requirements aren't met. +class AuthError : public std::invalid_argument { +public: + using std::invalid_argument::invalid_argument; +}; + +/** + * Class BaseModule is the very base class for the request processing modules of the HTTP servers. + */ +class BaseModule { +public: + BaseModule() = delete; + BaseModule(BaseModule const&) = delete; + BaseModule& operator=(BaseModule const&) = delete; + + virtual ~BaseModule() = default; + + /** + * Invokes a subclass-specific request processing provided by implementations. + * + * @param subModuleName this optional parameter allows modules to have + * multiple sub-modules. A value of this parameter will be forwarded to + * the subclass-specific implementation of the pure virtual method + * Module::executeImpl(). + * @param authType Authorization requirements of the module. If 'http::AuthType::REQUIRED' is + * requested then the method will enforce the authorization. A lack of required + * authorization key in a request, or an incorrect value of such key would result + * in a error sent back to a client. + * + * @note For requests with 'http::AuthType::REQUIRED' authorization keys must be sent + * by a requestor in the body of a request. There are two types of keys. The normal + * authorization level key "auth_key" is required for most operations resulting + * in any changes made to a persistent or transient states of Qserv, and its + * Replication/Ingest systems. The key is also required when requesting sensitive + * information from the system. The "administrator"-level "admin_auth_key" superseeds + * "auth_key" by adding elevated privileges to requests. If "admin_auth_key" is found + * in the body then "auth_key" (if any provided) will be ignored, and it won't be + * validated if present. It's up to a specific module to decide on how to (or if) + * use the administrative privileges. + */ + virtual void execute(std::string const& subModuleName = std::string(), + http::AuthType const authType = http::AuthType::NONE) = 0; + +protected: + /** + * @param authKey An authorization key for operations which require extra security. + * @param adminAuthKey An administrator-level authorization key. + */ + BaseModule(std::string const& authKey, std::string const& adminAuthKey); + + /// @return Authorization level of the request. + bool isAdmin() const { return _isAdmin; } + + /// @return The method of a request. + virtual std::string method() const = 0; + + /// @return Captured URL path elements. + virtual std::unordered_map params() const = 0; + + /// @return Parameters of the request's query captured from the request's URL. + virtual RequestQuery query() const = 0; + + /// @return Optional parameters of a request extracted from the request's body (if any). + RequestBodyJSON const& body() const { return _body; } + + /// @return A reference to the modifiable object that stores optional parameters of a request + /// extracted from the request's body (if any). The method is used by subclasses to set the + /// body of a request. + RequestBodyJSON& body() { return _body; } + + // Message loggers for the corresponding log levels + + void info(std::string const& msg) const; + void info(std::string const& context, std::string const& msg) const { info(context + " " + msg); } + + void debug(std::string const& msg) const; + void debug(std::string const& context, std::string const& msg) const { debug(context + " " + msg); } + + void warn(std::string const& msg) const; + void warn(std::string const& context, std::string const& msg) const { warn(context + " " + msg); } + + void error(std::string const& msg) const; + void error(std::string const& context, std::string const& msg) const { error(context + " " + msg); } + + /** + * @return A context in which a module runs. This is used for error adn info reporting. + * The method is required to be implemented by a subclass. + */ + virtual std::string context() const = 0; + + /** + * @brief Check the API version in the request's query or its body. + * + * The version is specified in the optional attribute 'version'. If the attribute + * was found present in the request then its value would be required to be within + * the specified minimum and the implied maximum, that's the current version number + * of the REST API. In case if no version info was found in the request the method + * will simply note this and the service will report a lack of the version number + * in the "warning" attribute at the returned JSON object. + * + * The method will look for th eversion attribute in the query string of the "GET" + * requests. For requests that are called using methods "POST", "PUT" or "DELETE" + * the attribute will be located in the requests's body. + * + * @note Services that are calling the method should adjust the minimum version + * number to be the same as the current value in the implementation of + * http::MetaModule::version if the expected JSON schema of the corresponding + * request changes. + * @see http::MetaModule::version + * + * @param func The name of the calling context (it's used for error reporting). + * @param minVersion The minimum version number of the valid version range. + * @param warning The optional warning to be sent to a client along with the usual + * error if the minimum version requirement won't be satisfied. This mechanism + * allows REST serivices to notify clients on possible problems encountered + * when validating parameters of a request. + * + * @throw http::Error if a value of the attribute is not within the expected range. + */ + void checkApiVersion(std::string const& func, unsigned int minVersion, + std::string const& warning = std::string()) const; + + /** + * @brief Check if the specified identifier of the Qserv instance that was received + * from a client matches the one that is required in the service context. Throw + * an exception in case of mismatch. + * + * @param func The name of the calling context (it's used for error reporting). + * @param requiredInstanceId An instance identifier required in the service context. + * @throws std::invalid_argument If the dentifiers didn't match. + */ + void enforceInstanceId(std::string const& func, std::string const& requiredInstanceId) const; + + /** + * Send a response back to a requester of a service. + * @param content The content to be sent back. + * @param contentType The type of the content to be sent back. + */ + virtual void sendResponse(std::string const& content, std::string const& contentType) = 0; + + /** + * Inspect the body of a request or a presence of a user-supplied authorization key. + * Its value will be compared against a value of the corresponding configuration + * parameter of the service (processorConfig) passed into the constructor of the class. + * In the absence of the message body, or in the absence of the key in the body, or + * in case of any mismatch between the keys would result in an exception thrown. + * + * @param authType Authorization requirements of the module. If 'http::AuthType::REQUIRED' is + * requested then the method will enforce the authorization. A lack of required + * authorization key in a request, or an incorrect value of such key would result + * in a error sent back to a client. + * @throw AuthError This exception is thrown if the authorization requirements weren't met. + */ + void enforceAuthorization(http::AuthType const authType = http::AuthType::NONE); + + /** + * Report a error condition and send an error message back to a requester + * of a service. + * + * @param func The name of a context from which the operation was initiated. + * @param errorMsg An error condition to be reported. + * @param errorExt (optional) The additional information on the error. + */ + void sendError(std::string const& func, std::string const& errorMsg, + nlohmann::json const& errorExt = nlohmann::json::object()); + + /** + * Report a result back to a requester of a service upon its successful + * completion. + * @param result A JSON object to be sent back. + */ + void sendData(nlohmann::json& result); + +private: + // Input parameters + std::string const _authKey; + std::string const _adminAuthKey; + + /// The flag indicating if a request has been granted the "administrator"-level privileges. + bool _isAdmin = false; + + /// The body of a request is initialized by BaseModule::execute(). + RequestBodyJSON _body; + + /// The optional warning message to be sent to a caller if the API version + /// number wasn't mentoned in the request. + mutable std::list _warnings; +}; + +} // namespace lsst::qserv::http + +#endif // LSST_QSERV_HTTP_BASEMODULE_H diff --git a/src/http/CMakeLists.txt b/src/http/CMakeLists.txt index 2bf9e2bbf..f6fb8f45d 100644 --- a/src/http/CMakeLists.txt +++ b/src/http/CMakeLists.txt @@ -2,6 +2,7 @@ add_library(http SHARED) target_sources(http PRIVATE AsyncReq.cc + BaseModule.cc BinaryEncoding.cc ChttpMetaModule.cc ChttpModule.cc diff --git a/src/http/Module.cc b/src/http/Module.cc index 3156d7c3c..41916b293 100644 --- a/src/http/Module.cc +++ b/src/http/Module.cc @@ -24,11 +24,6 @@ // Qserv headers #include "http/Exceptions.h" -#include "http/MetaModule.h" -#include "http/RequestQuery.h" - -// LSST headers -#include "lsst/log/Log.h" // System headers #include @@ -36,125 +31,34 @@ using namespace std; using json = nlohmann::json; -namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.http.Module"); - -string packWarnings(list const& warnings) { - string packed; - for (auto const& msg : warnings) { - if (!packed.empty()) packed += "; "; - packed += msg; - } - return packed; -} -} // namespace - namespace lsst::qserv::http { -Module::Module(string const& authKey, string const& adminAuthKey) - : _authKey(authKey), _adminAuthKey(adminAuthKey) {} +Module::Module(string const& authKey, string const& adminAuthKey) : BaseModule(authKey, adminAuthKey) {} void Module::execute(string const& subModuleName, http::AuthType const authType) { try { _parseRequestBodyJSON(); - if (authType == http::AuthType::REQUIRED) _enforceAuthorization(); + enforceAuthorization(authType); json result = executeImpl(subModuleName); - _sendData(result); + sendData(result); } catch (AuthError const& ex) { - _sendError(__func__, "failed to pass authorization requirements, ex: " + string(ex.what())); + sendError(__func__, "failed to pass authorization requirements, ex: " + string(ex.what())); } catch (http::Error const& ex) { - _sendError(ex.func(), ex.what(), ex.errorExt()); + sendError(ex.func(), ex.what(), ex.errorExt()); } catch (invalid_argument const& ex) { - _sendError(__func__, "invalid parameters of the request, ex: " + string(ex.what())); + sendError(__func__, "invalid parameters of the request, ex: " + string(ex.what())); } catch (exception const& ex) { - _sendError(__func__, "operation failed due to: " + string(ex.what())); - } -} - -void Module::checkApiVersion(string const& func, unsigned int minVersion, string const& warning) const { - unsigned int const maxVersion = MetaModule::version; - unsigned int version = 0; - string const versionAttrName = "version"; - json const errorEx = json::object({{"min_version", minVersion}, {"max_version", maxVersion}}); - - // Intercept exceptions thrown when converting the attribute's value (if provided) - // in order to inject the allowed range of the version numbers into the extended - // error sent back to the caller. - // - // Note that requests sent w/o explicitly specified API version will still be - // processed. In this case a warning will be sent in the response object. - try { - if (method() == "GET") { - if (!query().has(versionAttrName)) { - warn("No version number was provided in the request's query."); - return; - } - version = query().requiredUInt(versionAttrName); - } else { - if (!body().has(versionAttrName)) { - warn("No version number was provided in the request's body."); - return; - } - version = body().required(versionAttrName); - } - } catch (...) { - throw http::Error(func, "The required parameter " + versionAttrName + " is not a number.", errorEx); - } - if (!(minVersion <= version && version <= maxVersion)) { - if (!warning.empty()) warn(warning); - throw http::Error(func, - "The requested version " + to_string(version) + - " of the API is not in the range supported by the service.", - errorEx); - } -} - -void Module::enforceInstanceId(string const& func, string const& requiredInstanceId) const { - string const instanceId = method() == "GET" ? query().requiredString("instance_id") - : body().required("instance_id"); - debug(func, "instance_id: " + instanceId); - if (instanceId != requiredInstanceId) { - throw invalid_argument(context() + func + " Qserv instance identifier mismatch. Client sent '" + - instanceId + "' instead of '" + requiredInstanceId + "'."); + sendError(__func__, "operation failed due to: " + string(ex.what())); } } -void Module::info(string const& msg) const { LOGS(_log, LOG_LVL_INFO, context() << msg); } - -void Module::debug(string const& msg) const { LOGS(_log, LOG_LVL_DEBUG, context() << msg); } - -void Module::warn(string const& msg) const { - LOGS(_log, LOG_LVL_WARN, context() << msg); - _warnings.push_back(msg); -} - -void Module::error(string const& msg) const { LOGS(_log, LOG_LVL_ERROR, context() << msg); } - -void Module::_sendError(string const& func, string const& errorMsg, json const& errorExt) { - error(func, errorMsg); - json result; - result["success"] = 0; - result["error"] = errorMsg; - result["error_ext"] = errorExt.is_null() ? json::object() : errorExt; - result["warning"] = ::packWarnings(_warnings); - sendResponse(result.dump(), "application/json"); -} - -void Module::_sendData(json& result) { - result["success"] = 1; - result["error"] = ""; - result["error_ext"] = json::object(); - result["warning"] = ::packWarnings(_warnings); - sendResponse(result.dump(), "application/json"); -} - void Module::_parseRequestBodyJSON() { string content; getRequestBody(content, "application/json"); if (!content.empty()) { try { - _body.objJson = json::parse(content); - if (_body.objJson.is_null() || _body.objJson.is_object()) return; + body().objJson = json::parse(content); + if (body().objJson.is_null() || body().objJson.is_object()) return; } catch (...) { // Not really interested in knowing specific details of the exception. // All what matters here is that the string can't be parsed into @@ -162,33 +66,8 @@ void Module::_parseRequestBodyJSON() { // after this block ends. ; } - throw std::invalid_argument("invalid format of the request body. A simple JSON object was expected"); - } -} - -void Module::_enforceAuthorization() { - if (body().has("admin_auth_key")) { - auto const adminAuthKey = body().required("admin_auth_key"); - if (adminAuthKey != _adminAuthKey) { - throw AuthError(context() + - "administrator's authorization key 'admin_auth_key' in the request" - " doesn't match the one in server configuration"); - } - _isAdmin = true; - return; - } - if (body().has("auth_key")) { - auto const authKey = body().required("auth_key"); - if (authKey != _authKey) { - throw AuthError(context() + - "authorization key 'auth_key' in the request doesn't match" - " the one in server configuration"); - } - return; + throw invalid_argument("invalid format of the request body. A simple JSON object was expected"); } - throw AuthError(context() + - "none of the authorization keys 'auth_key' or 'admin_auth_key' was found" - " in the request. Please, provide one."); } } // namespace lsst::qserv::http diff --git a/src/http/Module.h b/src/http/Module.h index 009d2a192..a6e6fff80 100644 --- a/src/http/Module.h +++ b/src/http/Module.h @@ -33,7 +33,7 @@ #include "nlohmann/json.hpp" // Qserv headers -#include "http/RequestBodyJSON.h" +#include "http/BaseModule.h" // Forward declarations namespace lsst::qserv::http { @@ -43,57 +43,20 @@ class RequestQuery; // This header declarations namespace lsst::qserv::http { -/// The enumeration type which is used for configuring/enforcing -/// module's authorization requirements. -enum class AuthType { REQUIRED, NONE }; - -/// Class AuthError represent exceptions thrown when the authorization -/// requirements aren't met. -class AuthError : public std::invalid_argument { -public: - using std::invalid_argument::invalid_argument; -}; - /** - * Class Module is the very base class for the request processing modules of the HTTP servers. + * Class Module is a specialization of the class BaseModule serving as an intermediate + * base class for the simple request processing modules of the HTTP servers. Modules + * in this hierachy do not allow uploading files or any other data in the streaming mode. */ -class Module { +class Module : public BaseModule { public: Module() = delete; Module(Module const&) = delete; Module& operator=(Module const&) = delete; virtual ~Module() = default; - - /** - * Invokes a subclass-specific request processing provided by implementations - * of the pure virtual method Module::executeImpl(). The current method - * would also do an optional processing of exceptions thrown by the subclass-specific - * implementations of method Module::executeImpl(). These error conditions will - * be reported to as errors to callers. - * - * @param subModuleName this optional parameter allows modules to have - * multiple sub-modules. A value of this parameter will be forwarded to - * the subclass-specific implementation of the pure virtual method - * Module::executeImpl(). - * @param authType Authorization requirements of the module. If 'http::AuthType::REQUIRED' is - * requested then the method will enforce the authorization. A lack of required - * authorization key in a request, or an incorrect value of such key would result - * in a error sent back to a client. - * - * @note For requests with 'http::AuthType::REQUIRED' authorization keys must be sent - * by a requestor in the body of a request. There are two types of keys. The normal - * authorization level key "auth_key" is required for most operations resulting - * in any changes made to a persistent or transient states of Qserv, and its - * Replication/Ingest systems. The key is also required when requesting sensitive - * information from the system. The "administrator"-level "admin_auth_key" superseeds - * "auth_key" by adding elevated privileges to requests. If "admin_auth_key" is found - * in the body then "auth_key" (if any provided) will be ignored, and it won't be - * validated if present. It's up to a specific module to decide on how to (or if) - * use the administrative privileges. - */ - void execute(std::string const& subModuleName = std::string(), - http::AuthType const authType = http::AuthType::NONE); + virtual void execute(std::string const& subModuleName = std::string(), + http::AuthType const authType = http::AuthType::NONE); protected: /** @@ -102,84 +65,6 @@ class Module { */ Module(std::string const& authKey, std::string const& adminAuthKey); - /// @return Authorization level of the request. - bool isAdmin() const { return _isAdmin; } - - /// @return The method of a request. - virtual std::string method() const = 0; - - /// @return Captured URL path elements. - virtual std::unordered_map params() const = 0; - - /// @return Parameters of the request's query captured from the request's URL. - virtual RequestQuery query() const = 0; - - /// @return Optional parameters of a request extracted from the request's body (if any). - RequestBodyJSON const& body() const { return _body; } - - // Message loggers for the corresponding log levels - - void info(std::string const& msg) const; - void info(std::string const& context, std::string const& msg) const { info(context + " " + msg); } - - void debug(std::string const& msg) const; - void debug(std::string const& context, std::string const& msg) const { debug(context + " " + msg); } - - void warn(std::string const& msg) const; - void warn(std::string const& context, std::string const& msg) const { warn(context + " " + msg); } - - void error(std::string const& msg) const; - void error(std::string const& context, std::string const& msg) const { error(context + " " + msg); } - - /** - * @return A context in which a module runs. This is used for error adn info reporting. - * The method is required to be implemented by a subclass. - */ - virtual std::string context() const = 0; - - /** - * @brief Check the API version in the request's query or its body. - * - * The version is specified in the optional attribute 'version'. If the attribute - * was found present in the request then its value would be required to be within - * the specified minimum and the implied maximum, that's the current version number - * of the REST API. In case if no version info was found in the request the method - * will simply note this and the service will report a lack of the version number - * in the "warning" attribute at the returned JSON object. - * - * The method will look for th eversion attribute in the query string of the "GET" - * requests. For requests that are called using methods "POST", "PUT" or "DELETE" - * the attribute will be located in the requests's body. - * - * @note Services that are calling the method should adjust the minimum version - * number to be the same as the current value in the implementation of - * http::MetaModule::version if the expected JSON schema of the corresponding - * request changes. - * @see http::MetaModule::version - * - * @param func The name of the calling context (it's used for error reporting). - * @param minVersion The minimum version number of the valid version range. - * @param warning The optional warning to be sent to a client along with the usual - * error if the minimum version requirement won't be satisfied. This mechanism - * allows REST serivices to notify clients on possible problems encountered - * when validating parameters of a request. - * - * @throw http::Error if a value of the attribute is not within the expected range. - */ - void checkApiVersion(std::string const& func, unsigned int minVersion, - std::string const& warning = std::string()) const; - - /** - * @brief Check if the specified identifier of the Qserv instance that was received - * from a client matches the one that is required in the service context. Throw - * an exception in case of mismatch. - * - * @param func The name of the calling context (it's used for error reporting). - * @param requiredInstanceId An instance identifier required in the service context. - * @throws std::invalid_argument If the dentifiers didn't match. - */ - void enforceInstanceId(std::string const& func, std::string const& requiredInstanceId) const; - /** * Get the raw body of a request if it's available and if the content type * meets expectations. @@ -201,13 +86,6 @@ class Module { */ virtual nlohmann::json executeImpl(std::string const& subModuleName) = 0; - /** - * Send a response back to a requester of a service. - * @param content The content to be sent back. - * @param contentType The type of the content to be sent back. - */ - virtual void sendResponse(std::string const& content, std::string const& contentType) = 0; - private: /** * Pull the raw request body and translate it into a JSON object. @@ -215,50 +93,6 @@ class Module { * type is "application/json". Otherwise the body will be left empty. */ void _parseRequestBodyJSON(); - - /** - * Inspect the body of a request or a presence of a user-supplied authorization key. - * Its value will be compared against a value of the corresponding configuration - * parameter of the service (processorConfig) passed into the constructor of the class. - * In the absence of the message body, or in the absence of the key in the body, or - * in case of any mismatch between the keys would result in an exception thrown. - * - * @throw AuthError This exception is thrown if the authorization requirements weren't met. - */ - void _enforceAuthorization(); - - /** - * Report a error condition and send an error message back to a requester - * of a service. - * - * @param func The name of a context from which the operation was initiated. - * @param errorMsg An error condition to be reported. - * @param errorExt (optional) The additional information on the error. - */ - void _sendError(std::string const& func, std::string const& errorMsg, - nlohmann::json const& errorExt = nlohmann::json::object()); - - /** - * Report a result back to a requester of a service upon its successful - * completion. - * @param result A JSON object to be sent back. - */ - void _sendData(nlohmann::json& result); - - // Input parameters - - std::string const _authKey; - std::string const _adminAuthKey; - - /// The flag indicating if a request has been granted the "administrator"-level privileges. - bool _isAdmin = false; - - /// The body of a request is initialized by Module::execute(). - RequestBodyJSON _body; - - /// The optional warning message to be sent to a caller if the API version - /// number wasn't mentoned in the request. - mutable std::list _warnings; }; } // namespace lsst::qserv::http From 21f283562ead8571b11d8eaf97d463493b8fe106 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 16 Sep 2024 12:28:01 -0700 Subject: [PATCH 5/7] Added an intermediate base class for implementing file uploading requests --- src/http/CMakeLists.txt | 1 + src/http/FileUploadModule.cc | 111 +++++++++++++++++++++++++ src/http/FileUploadModule.h | 157 +++++++++++++++++++++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 src/http/FileUploadModule.cc create mode 100644 src/http/FileUploadModule.h diff --git a/src/http/CMakeLists.txt b/src/http/CMakeLists.txt index f6fb8f45d..454d4ab88 100644 --- a/src/http/CMakeLists.txt +++ b/src/http/CMakeLists.txt @@ -11,6 +11,7 @@ target_sources(http PRIVATE ClientConfig.cc ClientConnPool.cc Exceptions.cc + FileUploadModule.cc MetaModule.cc Method.cc Module.cc diff --git a/src/http/FileUploadModule.cc b/src/http/FileUploadModule.cc new file mode 100644 index 000000000..ca1c4a0f7 --- /dev/null +++ b/src/http/FileUploadModule.cc @@ -0,0 +1,111 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "http/FileUploadModule.h" + +// System headers +#include + +// Third-party headers +#include + +// Qserv headers +#include "http/Exceptions.h" +#include "http/RequestQuery.h" + +using namespace std; +using json = nlohmann::json; + +namespace lsst::qserv::http { + +FileUploadModule::FileUploadModule(string const& authKey, string const& adminAuthKey, + httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader) + : BaseModule(authKey, adminAuthKey), _req(req), _resp(resp), _contentReader(contentReader) {} + +void FileUploadModule::execute(string const& subModuleName, http::AuthType const authType) { + _subModuleName = subModuleName; + try { + if (!_req.is_multipart_form_data()) { + throw AuthError(context() + "the request is not a multipart form data"); + } + unique_ptr currentFile; + auto const processEndOfEntry = [&]() { + if (currentFile != nullptr) { + if (!currentFile->filename.empty()) { + onEndOfFile(); + } else { + body().objJson[currentFile->name] = currentFile->content; + } + } + }; + _contentReader( + [&](httplib::MultipartFormData const& file) -> bool { + processEndOfEntry(); + if (!file.filename.empty()) { + onStartOfFile(file.name, file.filename, file.content_type); + } + currentFile.reset(new httplib::MultipartFormData(file)); + return true; + }, + [&](char const* data, size_t length) -> bool { + if (currentFile->filename.empty()) { + currentFile->content.append(data, length); + } else { + onFileData(data, length); + } + return true; + }); + processEndOfEntry(); + json result = onEndOfBody(); + sendData(result); + } catch (AuthError const& ex) { + sendError(__func__, "failed to pass authorization requirements, ex: " + string(ex.what())); + } catch (http::Error const& ex) { + sendError(ex.func(), ex.what(), ex.errorExt()); + } catch (invalid_argument const& ex) { + sendError(__func__, "invalid parameters of the request, ex: " + string(ex.what())); + } catch (exception const& ex) { + sendError(__func__, "operation failed due to: " + string(ex.what())); + } +} +string FileUploadModule::method() const { return _req.method; } + +unordered_map FileUploadModule::params() const { return _req.path_params; } + +RequestQuery FileUploadModule::query() const { + // TODO: The query parameters in CPP-HTTPLIB are stored in the std::multimap + // container to allow accumulating values of non-unique keys. For now we need + // to convert the multimap to the std::unordered_map container. This may result + // in losing some query parameters if they have the same key but different values. + // Though, the correct solution is to fix the QHTTP library to support + // the std::multimap container for query parameters. + unordered_map queryParams; + for (auto const& [key, value] : _req.params) queryParams[key] = value; + return RequestQuery(queryParams); +} + +void FileUploadModule::sendResponse(string const& content, string const& contentType) { + _resp.set_content(content, contentType); +} + +} // namespace lsst::qserv::http diff --git a/src/http/FileUploadModule.h b/src/http/FileUploadModule.h new file mode 100644 index 000000000..8d013935c --- /dev/null +++ b/src/http/FileUploadModule.h @@ -0,0 +1,157 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_HTTP_FILEUPLOADMODULE_H +#define LSST_QSERV_HTTP_FILEUPLOADMODULE_H + +// System headers +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "http/BaseModule.h" + +// Forward declarations + +namespace httplib { +class ContentReader; +class Request; +class Response; +} // namespace httplib + +namespace lsst::qserv::http { +class RequestQuery; +} // namespace lsst::qserv::http + +// This header declarations +namespace lsst::qserv::http { + +/** + * Class FileUploadModule is an extended base class specialized for constructing + * the CPP-HTTPLIB file uploading/processing modules. The uploading is expected + * to be done in a streaming mode. The class is abstract and is expected to be subclassed + * to implement the actual file uploading/processing logic. + * + * The class defines the following protocol allowing to handle 0 or many files: + * @code + * onStartOfFile \ + * onFileData \ + * .. * + * onFileData / + * onEndOfFile / + * + * onStartOfFile \ + * onFileData \ + * .. * + * onFileData / + * onEndOfFile / + * + * .. + * + * onEndOfBody + * @endcode + * The call of the onEndOfBody() method is expected to prepare the JSON object + * to be returned to the client. This is the only method that is guaranteed to be called + * once for each request, even if no files were sent in the request. + * + * @note Note a role of the parameter "subModuleName". The parameter is used to specify + * a name of a sub-module to be executed. It's up to the subclass to interpret the parameter + * and to decide what to do with it. + */ +class FileUploadModule : public BaseModule { +public: + FileUploadModule() = delete; + FileUploadModule(FileUploadModule const&) = delete; + FileUploadModule& operator=(FileUploadModule const&) = delete; + + virtual ~FileUploadModule() = default; + virtual void execute(std::string const& subModuleName = std::string(), + http::AuthType const authType = http::AuthType::NONE); + +protected: + /** + * @param authKey An authorization key for operations which require extra security. + * @param adminAuthKey An administrator-level authorization key. + * @param req The HTTP request. + * @param resp The HTTP response channel. + */ + FileUploadModule(std::string const& authKey, std::string const& adminAuthKey, httplib::Request const& req, + httplib::Response& resp, httplib::ContentReader const& contentReader); + + httplib::Request const& req() { return _req; } + httplib::Response& resp() { return _resp; } + std::string const& subModuleName() const { return _subModuleName; } + + // These methods implemented the BaseModule's pure virtual methods. + + virtual std::string method() const; + virtual std::unordered_map params() const; + virtual RequestQuery query() const; + virtual void sendResponse(std::string const& content, std::string const& contentType); + + // The following methods are required to be implemented by the subclasses + // to handle the file uploading. The methods are expected to throw exceptions + // for any problem encountered while evaluating a context of a request, or if + // the corresponidng operations couldn't be accomplished. + + /** + * Is called when a file is found in the requst. + * @param name The name of a parameter assocated with the file. + * @param fileName The name of the file to be opened. + * @param contentType The content type of the file. + */ + virtual void onStartOfFile(std::string const& name, std::string const& fileName, + std::string const& contentType) = 0; + + /** + * Is called when the next portion of the file data is available. The method may + * be called 0 or multiple times for a single file while the data is being uploaded. + * @param data The data of the file. + * @param length The length of the data. + */ + virtual void onFileData(char const* data, size_t length) = 0; + + /** + * Is called when the file parsing is finished. + */ + virtual void onEndOfFile() = 0; + + /** + * Is called when the body parsing is finished. This is the last call of the + * file uploading protocol. + * @return The JSON object to be sent back to the client. + */ + virtual nlohmann::json onEndOfBody() = 0; + +private: + // Input parameters + httplib::Request const& _req; + httplib::Response& _resp; + httplib::ContentReader const& _contentReader; + + std::string _subModuleName; ///< The name of the sub-module to be executed. +}; + +} // namespace lsst::qserv::http + +#endif // LSST_QSERV_HTTP_FILEUPLOADMODULE_H From 24a35fd01ce9fbe5515fd4c1bc5c6684985e9515 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Tue, 17 Sep 2024 18:51:51 -0700 Subject: [PATCH 6/7] A utility for parsing CSV dialect parameters of the ingest requests --- src/replica/ingest/CMakeLists.txt | 1 + src/replica/ingest/IngestHttpSvcMod.cc | 19 +-------- src/replica/ingest/IngestUtils.cc | 54 +++++++++++++++++++++++++ src/replica/ingest/IngestUtils.h | 55 ++++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 17 deletions(-) create mode 100644 src/replica/ingest/IngestUtils.cc create mode 100644 src/replica/ingest/IngestUtils.h diff --git a/src/replica/ingest/CMakeLists.txt b/src/replica/ingest/CMakeLists.txt index 8e9b1bd15..5d6b99a97 100644 --- a/src/replica/ingest/CMakeLists.txt +++ b/src/replica/ingest/CMakeLists.txt @@ -13,6 +13,7 @@ target_sources(replica_ingest PRIVATE IngestResourceMgrT.cc IngestSvc.cc IngestSvcConn.cc + IngestUtils.cc TransactionContrib.cc ) target_link_libraries(replica_ingest PUBLIC diff --git a/src/replica/ingest/IngestHttpSvcMod.cc b/src/replica/ingest/IngestHttpSvcMod.cc index a922d89dd..718533a72 100644 --- a/src/replica/ingest/IngestHttpSvcMod.cc +++ b/src/replica/ingest/IngestHttpSvcMod.cc @@ -26,6 +26,7 @@ #include "http/Method.h" #include "replica/ingest/IngestRequest.h" #include "replica/ingest/IngestRequestMgr.h" +#include "replica/ingest/IngestUtils.h" #include "replica/services/ServiceProvider.h" #include "replica/util/Csv.h" @@ -183,23 +184,7 @@ shared_ptr IngestHttpSvcMod::_createRequest(bool async) const { string const url = body().required("url"); string const charsetName = body().optional("charset_name", config->get("worker", "ingest-charset-name")); - - csv::DialectInput dialectInput; - // Allow an empty string in the input. Simply replace the one (if present) with - // the corresponding default value of the parameter. - auto const getDialectParam = [&](string const& param, string const& defaultValue) -> string { - string val = body().optional(param, defaultValue); - if (val.empty()) val = defaultValue; - return val; - }; - dialectInput.fieldsTerminatedBy = - getDialectParam("fields_terminated_by", csv::Dialect::defaultFieldsTerminatedBy); - dialectInput.fieldsEnclosedBy = - getDialectParam("fields_enclosed_by", csv::Dialect::defaultFieldsEnclosedBy); - dialectInput.fieldsEscapedBy = getDialectParam("fields_escaped_by", csv::Dialect::defaultFieldsEscapedBy); - dialectInput.linesTerminatedBy = - getDialectParam("lines_terminated_by", csv::Dialect::defaultLinesTerminatedBy); - + auto const dialectInput = parseDialectInput(body()); auto const httpMethod = http::string2method(body().optional("http_method", "GET")); string const httpData = body().optional("http_data", string()); vector const httpHeaders = body().optionalColl("http_headers", vector()); diff --git a/src/replica/ingest/IngestUtils.cc b/src/replica/ingest/IngestUtils.cc new file mode 100644 index 000000000..d7c776f37 --- /dev/null +++ b/src/replica/ingest/IngestUtils.cc @@ -0,0 +1,54 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/ingest/IngestUtils.h" + +// Qserv headers +#include "http/RequestBodyJSON.h" +#include "replica/util/Csv.h" + +using namespace std; + +namespace lsst::qserv::replica { + +csv::DialectInput parseDialectInput(http::RequestBodyJSON const& body) { + csv::DialectInput dialectInput; + + // Allow an empty string in the input. Simply replace the one (if present) with + // the corresponding default value of the parameter. + auto const getDialectParam = [&](string const& param, string const& defaultValue) -> string { + string val = body.optional(param, defaultValue); + if (val.empty()) val = defaultValue; + return val; + }; + dialectInput.fieldsTerminatedBy = + getDialectParam("fields_terminated_by", csv::Dialect::defaultFieldsTerminatedBy); + dialectInput.fieldsEnclosedBy = + getDialectParam("fields_enclosed_by", csv::Dialect::defaultFieldsEnclosedBy); + dialectInput.fieldsEscapedBy = getDialectParam("fields_escaped_by", csv::Dialect::defaultFieldsEscapedBy); + dialectInput.linesTerminatedBy = + getDialectParam("lines_terminated_by", csv::Dialect::defaultLinesTerminatedBy); + + return dialectInput; +} + +} // namespace lsst::qserv::replica \ No newline at end of file diff --git a/src/replica/ingest/IngestUtils.h b/src/replica/ingest/IngestUtils.h new file mode 100644 index 000000000..3db98ec1f --- /dev/null +++ b/src/replica/ingest/IngestUtils.h @@ -0,0 +1,55 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_INGESTUTILS_H +#define LSST_QSERV_REPLICA_INGESTUTILS_H + +// System headers +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "http/RequestBodyJSON.h" + +// Forward declarations + +namespace lsst::qserv::http { +class RequestBodyJSON; +} // namespace lsst::qserv::http + +namespace lsst::qserv::replica::csv { +class DialectInput; +} // namespace lsst::qserv::replica::csv + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Parse the dialect input from the request body. + * @param body The request body. + * @return The parsed dialect input. + */ +csv::DialectInput parseDialectInput(http::RequestBodyJSON const& body); + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_INGESTUTILS_H From 8ae7c0b4738d01ef03d3f74adb2e2f1f55637944 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 16 Sep 2024 18:46:37 -0700 Subject: [PATCH 7/7] Push ingest mode for contributions sent in the multipart/form-data body --- src/http/FileUploadModule.cc | 1 + src/http/FileUploadModule.h | 2 +- src/replica/ingest/CMakeLists.txt | 1 + src/replica/ingest/IngestFileHttpSvcMod.cc | 251 +++++++++++++++++++++ src/replica/ingest/IngestFileHttpSvcMod.h | 111 +++++++++ src/replica/ingest/IngestHttpSvc.cc | 5 + 6 files changed, 370 insertions(+), 1 deletion(-) create mode 100644 src/replica/ingest/IngestFileHttpSvcMod.cc create mode 100644 src/replica/ingest/IngestFileHttpSvcMod.h diff --git a/src/http/FileUploadModule.cc b/src/http/FileUploadModule.cc index ca1c4a0f7..8a5a705a2 100644 --- a/src/http/FileUploadModule.cc +++ b/src/http/FileUploadModule.cc @@ -62,6 +62,7 @@ void FileUploadModule::execute(string const& subModuleName, http::AuthType const [&](httplib::MultipartFormData const& file) -> bool { processEndOfEntry(); if (!file.filename.empty()) { + enforceAuthorization(authType); onStartOfFile(file.name, file.filename, file.content_type); } currentFile.reset(new httplib::MultipartFormData(file)); diff --git a/src/http/FileUploadModule.h b/src/http/FileUploadModule.h index 8d013935c..86b2d0021 100644 --- a/src/http/FileUploadModule.h +++ b/src/http/FileUploadModule.h @@ -129,7 +129,7 @@ class FileUploadModule : public BaseModule { * @param data The data of the file. * @param length The length of the data. */ - virtual void onFileData(char const* data, size_t length) = 0; + virtual void onFileData(char const* data, std::size_t length) = 0; /** * Is called when the file parsing is finished. diff --git a/src/replica/ingest/CMakeLists.txt b/src/replica/ingest/CMakeLists.txt index 5d6b99a97..053ebb82b 100644 --- a/src/replica/ingest/CMakeLists.txt +++ b/src/replica/ingest/CMakeLists.txt @@ -3,6 +3,7 @@ add_dependencies(replica_ingest replica_proto) target_sources(replica_ingest PRIVATE IngestClient.cc IngestDataHttpSvcMod.cc + IngestFileHttpSvcMod.cc IngestFileSvc.cc IngestHttpSvc.cc IngestHttpSvcMod.cc diff --git a/src/replica/ingest/IngestFileHttpSvcMod.cc b/src/replica/ingest/IngestFileHttpSvcMod.cc new file mode 100644 index 000000000..6fb76d16d --- /dev/null +++ b/src/replica/ingest/IngestFileHttpSvcMod.cc @@ -0,0 +1,251 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/ingest/IngestFileHttpSvcMod.h" + +// Qserv header +#include "http/BinaryEncoding.h" +#include "http/Exceptions.h" +#include "http/Url.h" +#include "replica/config/Configuration.h" +#include "replica/ingest/IngestUtils.h" +#include "replica/services/DatabaseServices.h" +#include "replica/services/ServiceProvider.h" +#include "replica/util/Csv.h" + +// System headers +#include +#include +#include + +// Third party headers +#include "httplib.h" + +using namespace std; +using json = nlohmann::json; + +namespace lsst::qserv::replica { + +void IngestFileHttpSvcMod::process(shared_ptr const& serviceProvider, + string const& workerName, httplib::Request const& req, + httplib::Response& resp, httplib::ContentReader const& contentReader, + http::AuthType const authType) { + IngestFileHttpSvcMod module(serviceProvider, workerName, req, resp, contentReader); + string const subModuleName; + module.execute(subModuleName, authType); +} + +IngestFileHttpSvcMod::IngestFileHttpSvcMod(shared_ptr const& serviceProvider, + string const& workerName, httplib::Request const& req, + httplib::Response& resp, + httplib::ContentReader const& contentReader) + : http::FileUploadModule(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp, + contentReader), + IngestFileSvc(serviceProvider, workerName) {} + +string IngestFileHttpSvcMod::context() const { return "INGEST-FILE-HTTP-SVC "; } + +void IngestFileHttpSvcMod::onStartOfFile(string const& name, string const& fileName, + string const& contentType) { + debug(__func__); + checkApiVersion(__func__, 38); + + auto const context_ = context() + __func__; + auto const config = serviceProvider()->config(); + auto const databaseServices = serviceProvider()->databaseServices(); + + if (isOpen()) { + throw http::Error(context_, "a file is already opened"); + } + if (!_contrib.tmpFile.empty()) { + throw http::Error(context_, "the service only allows one file per request"); + } + + // Fill out parameters in the contribution descriptor. This information is needed + // for bookeeping and monitoring purposes. The descriptor's state will be kept + // updated in the Replication/Ingest's database as the contribution processing + // will be happening. + _contrib.transactionId = body().requiredUInt("transaction_id"); + _contrib.table = body().required("table"); + _contrib.chunk = body().requiredUInt("chunk"); + _contrib.isOverlap = body().requiredUInt("overlap") != 0; + _contrib.worker = workerName(); + + // To indicate the file contents was streamed directly into the service + _contrib.url = "data-csv://" + req().remote_addr + "/" + fileName; + _contrib.charsetName = + body().optional("charset_name", config->get("worker", "ingest-charset-name")); + _contrib.dialectInput = parseDialectInput(body()); + + // Retries are allowed before an attemp to load data into MySQL. When such attempt + // is made the persistent state of the destination table is supposed to be changed. + _contrib.retryAllowed = true; + + // This parameters sets a limit foe the number of warnings (should there be any) + // reported by MySQL after contribution loading attempt. Warnings is an important + // mechanism for debugging problems with the ingested data. + _contrib.maxNumWarnings = body().optionalUInt("max_num_warnings", + config->get("worker", "loader-max-warnings")); + + debug(__func__, "transaction_id: " + to_string(_contrib.transactionId)); + debug(__func__, "table: '" + _contrib.table + "'"); + debug(__func__, "chunk: " + to_string(_contrib.chunk)); + debug(__func__, "overlap: " + string(_contrib.isOverlap ? "1" : "0")); + debug(__func__, "charset_name: '" + _contrib.charsetName + "'"); + debug(__func__, "max_num_warnings: " + to_string(_contrib.maxNumWarnings)); + + // Attempts to pass invalid transaction identifiers or tables are not recorded + // as transaction contributions in the persistent state of the Replication/Ingest + // system since it's impossible to determine a context of these operations. + // The following operations will throw exceptions should any problems with + // validation a context of the request will be encountered. + TransactionInfo const trans = databaseServices->transaction(_contrib.transactionId); + _contrib.database = trans.database; + + DatabaseInfo const database = config->databaseInfo(_contrib.database); + TableInfo const table = database.findTable(_contrib.table); + + // Prescreen parameters of the request to ensure they're valid in the given + // contex. Check the state of the transaction. Refuse to proceed with the request + // if any issues were detected. + + bool const failed = true; + + if (trans.state != TransactionInfo::State::STARTED) { + _contrib.error = context_ + " transactionId=" + to_string(_contrib.transactionId) + " is not active"; + _contrib = databaseServices->createdTransactionContrib(_contrib, failed); + _failed(_contrib.error); + throw http::Error(context_, _contrib.error); + } + + csv::Dialect dialect; + try { + http::Url const resource(_contrib.url); + if (resource.scheme() != http::Url::DATA_CSV) { + throw invalid_argument(context_ + " unsupported url '" + _contrib.url + "'"); + } + dialect = csv::Dialect(_contrib.dialectInput); + _parser.reset(new csv::Parser(dialect)); + } catch (exception const& ex) { + _contrib.error = ex.what(); + _contrib = databaseServices->createdTransactionContrib(_contrib, failed); + _failed(_contrib.error); + throw; + } + + // Register the contribution + _contrib = databaseServices->createdTransactionContrib(_contrib); + + // This is where the actual processing of the request begins. + try { + _contrib.tmpFile = openFile(_contrib.transactionId, _contrib.table, dialect, _contrib.charsetName, + _contrib.chunk, _contrib.isOverlap); + _contrib = databaseServices->startedTransactionContrib(_contrib); + } catch (http::Error const& ex) { + json const errorExt = ex.errorExt(); + if (!errorExt.empty()) { + _contrib.httpError = errorExt["http_error"]; + _contrib.systemError = errorExt["system_error"]; + _contrib.retryAllowed = errorExt["retry_allowed"].get() != 0; + } + _contrib.error = ex.what(); + _contrib = databaseServices->startedTransactionContrib(_contrib, failed); + _failed(_contrib.error); + throw; + } catch (exception const& ex) { + _contrib.systemError = errno; + _contrib.error = ex.what(); + _contrib = databaseServices->startedTransactionContrib(_contrib, failed); + _failed(_contrib.error); + throw; + } +} + +void IngestFileHttpSvcMod::onFileData(char const* data, size_t length) { + auto const context_ = context() + __func__; + if (!isOpen()) { + throw http::Error(context_, "no file was opened"); + } + _parseAndWriteData(data, length, false); +} + +void IngestFileHttpSvcMod::onEndOfFile() { + auto const context_ = context() + __func__; + if (!isOpen()) { + throw http::Error(context_, "no file was opened"); + } + + // Flush the parser to ensure the last row (if any) has been writen + // into the output file. + char const data[0] = {}; + size_t const length = 0; + _parseAndWriteData(data, length, true); + + // Report that processing of the input data and preparing the contribution file is over. + auto const databaseServices = serviceProvider()->databaseServices(); + _contrib = databaseServices->readTransactionContrib(_contrib); + + // Finished reading and preprocessing the input file. + // Begin making irreversible changes to the destination table. + _contrib.retryAllowed = false; + try { + loadDataIntoTable(_contrib.maxNumWarnings); + _contrib.numWarnings = numWarnings(); + _contrib.warnings = warnings(); + _contrib.numRowsLoaded = numRowsLoaded(); + _contrib = databaseServices->loadedTransactionContrib(_contrib); + closeFile(); + } catch (exception const& ex) { + _contrib.error = "MySQL load failed, ex: " + string(ex.what()); + _contrib.systemError = errno; + bool const failed = true; + databaseServices->loadedTransactionContrib(_contrib, failed); + _failed(context_); + throw http::Error(context_, _contrib.error); + } +} + +json IngestFileHttpSvcMod::onEndOfBody() { + auto const context_ = context() + __func__; + if (_contrib.tmpFile.empty()) { + throw http::Error(context_, "no file was sent in the request"); + } + if (isOpen()) { + throw http::Error(context_, "the file is still open"); + } + return json::object({{"contrib", _contrib.toJson()}}); +} + +void IngestFileHttpSvcMod::_parseAndWriteData(char const* data, size_t length, bool flush) { + _parser->parse(data, length, flush, [&](char const* buf, size_t size) { + writeRowIntoFile(buf, size); + _contrib.numRows++; + }); + _contrib.numBytes += length; // count unmodified input data +} + +void IngestFileHttpSvcMod::_failed(string const& context_) { + error(context_, _contrib.error); + closeFile(); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/ingest/IngestFileHttpSvcMod.h b/src/replica/ingest/IngestFileHttpSvcMod.h new file mode 100644 index 000000000..d66654a95 --- /dev/null +++ b/src/replica/ingest/IngestFileHttpSvcMod.h @@ -0,0 +1,111 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_INGESTFILEHTTPSVCMOD_H +#define LSST_QSERV_INGESTFILEHTTPSVCMOD_H + +// System headers +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "http/FileUploadModule.h" +#include "replica/ingest/IngestFileSvc.h" +#include "replica/ingest/TransactionContrib.h" + +// Forward declarations +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +namespace lsst::qserv::replica::csv { +class Parser; +} // namespace lsst::qserv::replica::csv + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class IngestFileHttpSvcMod processes chunk/table contribution requests made over HTTP. + * The class is used by the HTTP server built into the worker Ingest service. + * The current class is meant to be used for ingesting payloads that are pushed directly + * into the service over the HTTP protocol in the "multipart/form-data" body of the request. + */ +class IngestFileHttpSvcMod : public http::FileUploadModule, public IngestFileSvc { +public: + IngestFileHttpSvcMod() = delete; + IngestFileHttpSvcMod(IngestFileHttpSvcMod const&) = delete; + IngestFileHttpSvcMod& operator=(IngestFileHttpSvcMod const&) = delete; + + virtual ~IngestFileHttpSvcMod() = default; + + /** + * Process a request. + * + * @param serviceProvider The provider of services is needed to access + * the configuration and the database services. + * @param workerName The name of a worker this service is acting upon (used to pull + * worker-specific configuration options for the service). + * @param req The HTTP request. + * @param resp The HTTP response channel. + * @param contentReader The content reader to be used for the file upload. + * @param authType The authorization requirements for the module + * @throws std::invalid_argument for unknown values of parameter 'subModuleName' + */ + static void process(std::shared_ptr const& serviceProvider, + std::string const& workerName, httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader, + http::AuthType const authType = http::AuthType::REQUIRED); + +protected: + virtual std::string context() const final; + virtual void onStartOfFile(std::string const& name, std::string const& fileName, + std::string const& contentType) final; + virtual void onFileData(char const* data, std::size_t length) final; + virtual void onEndOfFile() final; + virtual nlohmann::json onEndOfBody() final; + +private: + /// @see method IngestFileHttpSvcMod::create() + IngestFileHttpSvcMod(std::shared_ptr const& serviceProvider, + std::string const& workerName, httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader); + + void _parseAndWriteData(char const* data, std::size_t length, bool flush); + + /** + * Close the temporary file if needed and post an error message. + * @param context_ The caller's context. + */ + void _failed(std::string const& context_); + + TransactionContribInfo _contrib; ///< A state of the contribution processing + + /// The parse of the input stream as configured for the CSV dialect reported + /// by a client. + std::unique_ptr _parser; +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_INGESTFILEHTTPSVCMOD_H diff --git a/src/replica/ingest/IngestHttpSvc.cc b/src/replica/ingest/IngestHttpSvc.cc index 24d816fd7..4583f0180 100644 --- a/src/replica/ingest/IngestHttpSvc.cc +++ b/src/replica/ingest/IngestHttpSvc.cc @@ -30,6 +30,7 @@ #include "http/ChttpMetaModule.h" #include "replica/config/Configuration.h" #include "replica/ingest/IngestDataHttpSvcMod.h" +#include "replica/ingest/IngestFileHttpSvcMod.h" #include "replica/ingest/IngestHttpSvcMod.h" #include "replica/ingest/IngestRequest.h" #include "replica/ingest/IngestRequestMgr.h" @@ -81,6 +82,10 @@ void IngestHttpSvc::registerServices(unique_ptr const& server) IngestDataHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp, "SYNC-PROCESS-DATA"); }); + server->Post("/ingest/csv", [self](httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader) { + IngestFileHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp, contentReader); + }); server->Post("/ingest/file", [self](httplib::Request const& req, httplib::Response& resp) { IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, "SYNC-PROCESS");