Skip to content

Commit

Permalink
The HTTP-based backend of the Replication worker services
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Dec 17, 2024
1 parent aac3938 commit c7e2b18
Show file tree
Hide file tree
Showing 33 changed files with 5,552 additions and 30 deletions.
5 changes: 4 additions & 1 deletion src/replica/apps/WorkerApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include "replica/services/ServiceProvider.h"
#include "replica/util/FileUtils.h"
#include "replica/worker/FileServer.h"
#include "replica/worker/WorkerProcessor.h"
#include "replica/worker/WorkerHttpSvc.h"
#include "replica/worker/WorkerServer.h"

// LSST headers
Expand Down Expand Up @@ -113,6 +113,9 @@ int WorkerApp::runImpl() {
auto const reqProcSvr = WorkerServer::create(serviceProvider(), worker);
thread reqProcSvrThread([reqProcSvr]() { reqProcSvr->run(); });

auto const reqProcHttpSvr = WorkerHttpSvc::create(serviceProvider(), worker);
thread reqProcHttpSvrThread([reqProcHttpSvr]() { reqProcHttpSvr->run(); });

auto const fileSvr = FileServer::create(serviceProvider(), worker);
thread fileSvrThread([fileSvr]() { fileSvr->run(); });

Expand Down
1 change: 1 addition & 0 deletions src/replica/proto/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ add_library(replica_proto OBJECT)
target_sources(replica_proto PRIVATE
${REPLICA_PB_SRCS}
${REPLICA_PB_HDRS}
Protocol.cc
)
171 changes: 171 additions & 0 deletions src/replica/proto/Protocol.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "replica/proto/Protocol.h"

// System headers
#include <stdexcept>

using namespace std;

namespace lsst::qserv::replica::protocol {

string toString(SqlRequestType status) {
switch (status) {
case SqlRequestType::QUERY:
return "QUERY";
case SqlRequestType::CREATE_DATABASE:
return "CREATE_DATABASE";
case SqlRequestType::DROP_DATABASE:
return "DROP_DATABASE";
case SqlRequestType::ENABLE_DATABASE:
return "ENABLE_DATABASE";
case SqlRequestType::DISABLE_DATABASE:
return "DISABLE_DATABASE";
case SqlRequestType::GRANT_ACCESS:
return "GRANT_ACCESS";
case SqlRequestType::CREATE_TABLE:
return "CREATE_TABLE";
case SqlRequestType::DROP_TABLE:
return "DROP_TABLE";
case SqlRequestType::REMOVE_TABLE_PARTITIONING:
return "REMOVE_TABLE_PARTITIONING";
case SqlRequestType::DROP_TABLE_PARTITION:
return "DROP_TABLE_PARTITION";
case SqlRequestType::GET_TABLE_INDEX:
return "GET_TABLE_INDEX";
case SqlRequestType::CREATE_TABLE_INDEX:
return "CREATE_TABLE_INDEX";
case SqlRequestType::DROP_TABLE_INDEX:
return "DROP_TABLE_INDEX";
case SqlRequestType::ALTER_TABLE:
return "ALTER_TABLE";
case SqlRequestType::TABLE_ROW_STATS:
return "TABLE_ROW_STATS";
default:
throw logic_error("Unhandled SQL request type: " + to_string(static_cast<int>(status)));
}
}

string toString(Status status) {
switch (status) {
case Status::CREATED:
return "CREATED";
case Status::SUCCESS:
return "SUCCESS";
case Status::QUEUED:
return "QUEUED";
case Status::IN_PROGRESS:
return "IN_PROGRESS";
case Status::IS_CANCELLING:
return "IS_CANCELLING";
case Status::BAD:
return "BAD";
case Status::FAILED:
return "FAILED";
case Status::CANCELLED:
return "CANCELLED";
default:
throw logic_error("Unhandled status: " + to_string(static_cast<int>(status)));
}
}

string toString(StatusExt extendedStatus) {
switch (extendedStatus) {
case StatusExt::NONE:
return "NONE";
case StatusExt::INVALID_PARAM:
return "INVALID_PARAM";
case StatusExt::INVALID_ID:
return "INVALID_ID";
case StatusExt::FOLDER_STAT:
return "FOLDER_STAT";
case StatusExt::FOLDER_CREATE:
return "FOLDER_CREATE";
case StatusExt::FILE_STAT:
return "FILE_STAT";
case StatusExt::FILE_SIZE:
return "FILE_SIZE";
case StatusExt::FOLDER_READ:
return "FOLDER_READ";
case StatusExt::FILE_READ:
return "FILE_READ";
case StatusExt::FILE_ROPEN:
return "FILE_ROPEN";
case StatusExt::FILE_CREATE:
return "FILE_CREATE";
case StatusExt::FILE_OPEN:
return "FILE_OPEN";
case StatusExt::FILE_RESIZE:
return "FILE_RESIZE";
case StatusExt::FILE_WRITE:
return "FILE_WRITE";
case StatusExt::FILE_COPY:
return "FILE_COPY";
case StatusExt::FILE_DELETE:
return "FILE_DELETE";
case StatusExt::FILE_RENAME:
return "FILE_RENAME";
case StatusExt::FILE_EXISTS:
return "FILE_EXISTS";
case StatusExt::SPACE_REQ:
return "SPACE_REQ";
case StatusExt::NO_FOLDER:
return "NO_FOLDER";
case StatusExt::NO_FILE:
return "NO_FILE";
case StatusExt::NO_ACCESS:
return "NO_ACCESS";
case StatusExt::NO_SPACE:
return "NO_SPACE";
case StatusExt::FILE_MTIME:
return "FILE_MTIME";
case StatusExt::MYSQL_ERROR:
return "MYSQL_ERROR";
case StatusExt::LARGE_RESULT:
return "LARGE_RESULT";
case StatusExt::NO_SUCH_TABLE:
return "NO_SUCH_TABLE";
case StatusExt::NOT_PARTITIONED_TABLE:
return "NOT_PARTITIONED_TABLE";
case StatusExt::NO_SUCH_PARTITION:
return "NO_SUCH_PARTITION";
case StatusExt::MULTIPLE:
return "MULTIPLE";
case StatusExt::OTHER_EXCEPTION:
return "OTHER_EXCEPTION";
case StatusExt::FOREIGN_INSTANCE:
return "FOREIGN_INSTANCE";
case StatusExt::DUPLICATE_KEY:
return "DUPLICATE_KEY";
case StatusExt::CANT_DROP_KEY:
return "CANT_DROP_KEY";
default:
throw logic_error("Unhandled extended status: " + to_string(static_cast<int>(extendedStatus)));
}
}

string toString(Status status, StatusExt extendedStatus) {
return toString(status) + "::" + toString(extendedStatus);
}

} // namespace lsst::qserv::replica::protocol
139 changes: 139 additions & 0 deletions src/replica/proto/Protocol.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_REPLICA_PROTOCOL_H
#define LSST_QSERV_REPLICA_PROTOCOL_H

// System headers
#include <string>

// Third party headers
#include "nlohmann/json.hpp"

// This header declarations
namespace lsst::qserv::replica::protocol {

/// Subtypes of the SQL requests.
enum class SqlRequestType : int {

QUERY = 0,
CREATE_DATABASE = 1,
DROP_DATABASE = 2,
ENABLE_DATABASE = 3, ///< in Qserv
DISABLE_DATABASE = 4, ///< in Qserv
GRANT_ACCESS = 5,
CREATE_TABLE = 6,
DROP_TABLE = 7,
REMOVE_TABLE_PARTITIONING = 8,
DROP_TABLE_PARTITION = 9,
GET_TABLE_INDEX = 10,
CREATE_TABLE_INDEX = 11,
DROP_TABLE_INDEX = 12,
ALTER_TABLE = 13,
TABLE_ROW_STATS = 14
};

/// @return the string representation of the SQL request type
std::string toString(SqlRequestType status);

/// Types of the table indexes specified in the index management requests requests.
enum class SqlIndexSpec : int { DEFAULT = 1, UNIQUE = 2, FULLTEXT = 3, SPATIAL = 4 };

/// Status values returned by all request related to operations with
/// replicas. Request management operations always return messages whose types
/// match the return types of the corresponding (original) replica-related requests.
/// Service management requests have their own set of status values.
///
enum class Status : int {
CREATED = 0,
SUCCESS = 1,
QUEUED = 2,
IN_PROGRESS = 3,
IS_CANCELLING = 4,
BAD = 5,
FAILED = 6,
CANCELLED = 7
};

enum class StatusExt : int {
NONE = 0, ///< Unspecified problem.
INVALID_PARAM = 1, ///< Invalid parameter(s) of a request.
INVALID_ID = 2, ///< An invalid request identifier.
FOLDER_STAT = 4, ///< Failed to obtain fstat() for a folder.
FOLDER_CREATE = 5, ///< Failed to create a folder.
FILE_STAT = 6, ///< Failed to obtain fstat() for a file.
FILE_SIZE = 7, ///< Failed to obtain a size of a file.
FOLDER_READ = 8, ///< Failed to read the contents of a folder.
FILE_READ = 9, ///< Failed to read the contents of a file.
FILE_ROPEN = 10, ///< Failed to open a remote file.
FILE_CREATE = 11, ///< Failed to create a file.
FILE_OPEN = 12, ///< Failed to open a file.
FILE_RESIZE = 13, ///< Failed to resize a file.
FILE_WRITE = 14, ///< Failed to write into a file.
FILE_COPY = 15, ///< Failed to copy a file.
FILE_DELETE = 16, ///< Failed to delete a file.
FILE_RENAME = 17, ///< Failed to rename a file.
FILE_EXISTS = 18, ///< File already exists.
SPACE_REQ = 19, ///< Space availability check failed.
NO_FOLDER = 20, ///< Folder doesn't exist.
NO_FILE = 21, ///< File doesn't exist.
NO_ACCESS = 22, ///< No access to a file or a folder.
NO_SPACE = 23, ///< No space left on a device as required by an operation.
FILE_MTIME = 24, ///< Get/set 'mtime' operation failed.
MYSQL_ERROR = 25, ///< General MySQL error (other than any specific ones listed here).
LARGE_RESULT = 26, ///< Result exceeds a limit set in a request.
NO_SUCH_TABLE = 27, ///< No table found while performing a MySQL operation.
NOT_PARTITIONED_TABLE = 28, ///< The table is not MySQL partitioned as it was expected.
NO_SUCH_PARTITION = 29, ///< No MySQL partition found in a table as it was expected.
MULTIPLE = 30, ///< Multiple unspecified errors encountered when processing a request.
OTHER_EXCEPTION = 31, ///< Other exception not listed here.
FOREIGN_INSTANCE = 32, ///< Detected a request from a Controller serving an unrelated Qserv.
DUPLICATE_KEY = 33, ///< Duplicate key found when creating an index or altering a table schema.
CANT_DROP_KEY = 34 ///< Can't drop a field or a key which doesn't exist.
};

/// @return the string representation of the status
std::string toString(Status status);

/// @return the string representation of the extended status
std::string toString(StatusExt extendedStatus);

/// @return the string representation of the full status
std::string toString(Status status, StatusExt extendedStatus);

/// Status of a replica.
enum class ReplicaStatus : int { NOT_FOUND = 0, CORRUPT = 1, INCOMPLETE = 2, COMPLETE = 3 };

/// Status of a service.
enum class ServiceState : int { SUSPEND_IN_PROGRESS = 0, SUSPENDED = 1, RUNNING = 2 };

/// The header to be sent with the requests processed through the worker's queueing system.
struct QueuedRequestHdr {
std::string id;
int priority;
unsigned int timeout;
QueuedRequestHdr(std::string const& id_, int priority_, unsigned int timeout_)
: id(id_), priority(priority_), timeout(timeout_) {}
nlohmann::json toJson() const { return {{"id", id}, {"priority", priority}, {"timeout", timeout}}; };
};

} // namespace lsst::qserv::replica::protocol

#endif // LSST_QSERV_REPLICA_PROTOCOL_H
40 changes: 38 additions & 2 deletions src/replica/util/Common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
#include "boost/uuid/uuid.hpp"
#include "boost/uuid/uuid_generators.hpp"
#include "boost/uuid/uuid_io.hpp"
#include "nlohmann/json.hpp"

using namespace std;
using namespace nlohmann;
using json = nlohmann::json;

namespace lsst::qserv::replica {

Expand Down Expand Up @@ -80,6 +79,43 @@ string Generators::uniqueId() {
return boost::uuids::to_string(id);
}

///////////////////////////////////////////
// SqlColDef //
///////////////////////////////////////////

list<SqlColDef> parseSqlColumns(json const& columnsJsonArray) {
if (!columnsJsonArray.is_array()) {
throw invalid_argument("lsst::qserv::replica::" + string(__func__) +
" columnsJsonArray is not an array");
}
list<SqlColDef> columns;
for (auto const& column : columnsJsonArray) {
columns.emplace_back(column.at("name"), column.at("type"));
}
return columns;
}

///////////////////////////////////////////
// SqlIndexDef //
///////////////////////////////////////////

SqlIndexDef::SqlIndexDef(json const& indexSpecJson) {
if (!indexSpecJson.is_object()) {
throw invalid_argument("lsst::qserv::replica::" + string(__func__) +
" indexSpecJson is not an object");
}
spec = indexSpecJson.value("spec", "DEFAULT");
name = indexSpecJson.at("name");
comment = indexSpecJson.value("comment", "");
auto const keysJsonArray = indexSpecJson.at("keys");
if (!keysJsonArray.is_array()) {
throw invalid_argument("lsst::qserv::replica::" + string(__func__) + " keys is not an array");
}
for (auto const& key : keysJsonArray) {
keys.emplace_back(key.at("name"), key.at("length"), key.at("ascending"));
}
}

////////////////////////////////////////////
// Parameters of requests //
////////////////////////////////////////////
Expand Down
Loading

0 comments on commit c7e2b18

Please sign in to comment.