Skip to content

Commit

Permalink
Merge branch 'tickets/DM-45929'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Oct 4, 2024
2 parents 40a0184 + ed1b97e commit e243699
Show file tree
Hide file tree
Showing 23 changed files with 836 additions and 489 deletions.
2 changes: 1 addition & 1 deletion src/admin/python/lsst/qserv/admin/replicationInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(
self.repl_ctrl = urlparse(repl_ctrl_uri)
self.auth_key = auth_key
self.admin_auth_key = admin_auth_key
self.repl_api_version = 36
self.repl_api_version = 37
_log.debug(f"ReplicationInterface %s", self.repl_ctrl)

def version(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/http/ChttpMetaModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ string const adminAuthKey;

namespace lsst::qserv::http {

unsigned int const ChttpMetaModule::version = 36;
unsigned int const ChttpMetaModule::version = 37;

void ChttpMetaModule::process(string const& context, nlohmann::json const& info, httplib::Request const& req,
httplib::Response& resp, string const& subModuleName) {
Expand Down
2 changes: 1 addition & 1 deletion src/http/MetaModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ string const adminAuthKey;

namespace lsst::qserv::http {

unsigned int const MetaModule::version = 36;
unsigned int const MetaModule::version = 37;

void MetaModule::process(string const& context, nlohmann::json const& info,
shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp,
Expand Down
153 changes: 125 additions & 28 deletions src/replica/contr/HttpIngestTransModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
#include "replica/jobs/AbortTransactionJob.h"
#include "replica/jobs/DirectorIndexJob.h"
#include "replica/mysql/DatabaseMySQL.h"
#include "replica/services/DatabaseServices.h"
#include "replica/services/ServiceProvider.h"
#include "replica/util/Mutex.h"
#include "replica/util/NamedMutexRegistry.h"
#include "util/String.h"

using namespace std;
using json = nlohmann::json;
Expand Down Expand Up @@ -93,7 +93,7 @@ json HttpIngestTransModule::executeImpl(string const& subModuleName) {

json HttpIngestTransModule::_getTransactions() {
debug(__func__);
checkApiVersion(__func__, 16);
checkApiVersion(__func__, 37);

auto const config = controller()->serviceProvider()->config();
auto const databaseServices = controller()->serviceProvider()->databaseServices();
Expand All @@ -106,6 +106,7 @@ json HttpIngestTransModule::_getTransactions() {
auto const longContribFormat = query().optionalUInt64("contrib_long", 0) != 0;
bool const includeContext = query().optionalUInt64("include_context", 0) != 0;
bool const includeLog = query().optionalUInt64("include_log", 0) != 0;
bool const includeExtensions = query().optionalUInt64("include_extensions", 0) != 0;
bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0;
bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0;

Expand All @@ -117,17 +118,24 @@ json HttpIngestTransModule::_getTransactions() {
debug(__func__, "contrib_long=" + bool2str(longContribFormat));
debug(__func__, "include_context=" + bool2str(includeContext));
debug(__func__, "include_log=" + bool2str(includeLog));
debug(__func__, "include_extensions=" + bool2str(includeExtensions));
debug(__func__, "include_warnings=" + bool2str(includeWarnings));
debug(__func__, "include_retries=" + bool2str(includeRetries));

auto const transStateSelector = _parseTransStateSelector("trans_state");
auto const contribStatusSelector = _parseContribStatusSelector("contrib_status");

vector<string> databases;
if (databaseName.empty()) {
databases = config->databases(family, allDatabases, isPublished);
} else {
databases.push_back(databaseName);
}

string const anyTableSelector;
string const anyWorkerSelector;
bool const allWorkers = true;
int const anyChunkSelector = -1;
json result;
result["databases"] = json::object();
for (auto&& databaseName : databases) {
Expand All @@ -138,11 +146,14 @@ json HttpIngestTransModule::_getTransactions() {
result["databases"][database.name]["is_published"] = database.isPublished ? 1 : 0;
result["databases"][database.name]["num_chunks"] = chunks.size();
result["databases"][database.name]["transactions"] = json::array();
for (auto&& transaction : databaseServices->transactions(database.name, includeContext, includeLog)) {
for (auto&& transaction :
databaseServices->transactions(database.name, includeContext, includeLog, transStateSelector)) {
json transJson = transaction.toJson();
if (includeContributions) {
transJson["contrib"] = _getTransactionContributions(transaction, longContribFormat,
includeWarnings, includeRetries);
transJson["contrib"] = _getTransactionContributions(
transaction, anyTableSelector, anyWorkerSelector, contribStatusSelector,
anyChunkSelector, longContribFormat, includeExtensions, includeWarnings,
includeRetries);
}
result["databases"][database.name]["transactions"].push_back(transJson);
}
Expand All @@ -152,41 +163,84 @@ json HttpIngestTransModule::_getTransactions() {

json HttpIngestTransModule::_getTransaction() {
debug(__func__);
checkApiVersion(__func__, 16);
checkApiVersion(__func__, 37);

auto const config = controller()->serviceProvider()->config();
auto const databaseServices = controller()->serviceProvider()->databaseServices();
auto const id = stoul(params().at("id"));
TransactionId const transactionId = stoul(params().at("id"));
auto const databaseName = query().optionalString("database");
auto const tableName = query().optionalString("table");
auto const workerName = query().optionalString("worker");
int const chunkSelector = query().optionalInt("chunk", -1);
auto const includeContributions = query().optionalUInt64("contrib", 0) != 0;
auto const longContribFormat = query().optionalUInt64("contrib_long", 0) != 0;
bool const includeContext = query().optionalUInt64("include_context", 0) != 0;
bool const includeLog = query().optionalUInt64("include_log", 0) != 0;
bool const includeExtensions = query().optionalUInt64("include_extensions", 0) != 0;
bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0;
bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0;
size_t const minRetries = query().optionalUInt64("min_retries", 0);
size_t const minWarnings = query().optionalUInt64("min_warnings", 0);
size_t const maxEntries = query().optionalUInt64("max_entries", 0);

debug(__func__, "id=" + to_string(id));
debug(__func__, "id=" + to_string(transactionId));
debug(__func__, "database=" + databaseName);
debug(__func__, "table=" + tableName);
debug(__func__, "worker=" + workerName);
debug(__func__, "chunk=" + to_string(chunkSelector));
debug(__func__, "contrib=" + bool2str(includeContributions));
debug(__func__, "contrib_long=" + bool2str(longContribFormat));
debug(__func__, "include_context=" + bool2str(includeContext));
debug(__func__, "include_log=" + bool2str(includeLog));
debug(__func__, "include_extensions=" + bool2str(includeExtensions));
debug(__func__, "include_warnings=" + bool2str(includeWarnings));
debug(__func__, "include_retries=" + bool2str(includeRetries));
debug(__func__, "min_retries=" + to_string(minRetries));
debug(__func__, "min_warnings=" + to_string(minWarnings));
debug(__func__, "max_entries=" + to_string(maxEntries));

auto const transStateSelector = _parseTransStateSelector("trans_state");
auto const contribStatusSelector = _parseContribStatusSelector("contrib_status");

if (databaseName.empty() && (transactionId == 0)) {
throw http::Error(__func__, "either 'id' or 'database' query parameter must be specified");
}

DatabaseInfo database;
vector<TransactionInfo> transactions;
if (transactionId != 0) {
auto transaction = databaseServices->transaction(transactionId, includeContext, includeLog);
database = config->databaseInfo(transaction.database);
if (!databaseName.empty() && (databaseName != database.name)) {
throw http::Error(__func__, "transaction id=" + to_string(transactionId) +
" is associated with database '" + database.name +
"' which is different from the requested database '" +
databaseName + "'");
}
transactions.push_back(move(transaction));
} else {
database = config->databaseInfo(databaseName);
transactions =
databaseServices->transactions(database.name, includeContext, includeLog, transStateSelector);
}

auto const transaction = databaseServices->transaction(id, includeContext, includeLog);
auto const database = config->databaseInfo(transaction.database);
bool const allWorkers = true;
vector<unsigned int> chunks;
databaseServices->findDatabaseChunks(chunks, transaction.database, allWorkers);
databaseServices->findDatabaseChunks(chunks, database.name, allWorkers);

json transJson = transaction.toJson();
if (includeContributions) {
transJson["contrib"] =
_getTransactionContributions(transaction, longContribFormat, includeWarnings, includeRetries);
}
json result;
result["databases"][transaction.database]["is_published"] = database.isPublished ? 1 : 0;
result["databases"][transaction.database]["num_chunks"] = chunks.size();
result["databases"][transaction.database]["transactions"].push_back(transJson);
result["databases"][database.name]["is_published"] = database.isPublished ? 1 : 0;
result["databases"][database.name]["num_chunks"] = chunks.size();
for (auto&& transaction : transactions) {
json transJson = transaction.toJson();
if (includeContributions) {
transJson["contrib"] = _getTransactionContributions(
transaction, tableName, workerName, contribStatusSelector, chunkSelector,
longContribFormat, includeExtensions, includeWarnings, includeRetries, minRetries,
minWarnings, maxEntries);
}
result["databases"][database.name]["transactions"].push_back(transJson);
}
return result;
}

Expand Down Expand Up @@ -452,18 +506,21 @@ json HttpIngestTransModule::_endTransaction() {

json HttpIngestTransModule::_getContribution() {
debug(__func__);
checkApiVersion(__func__, 16);
checkApiVersion(__func__, 37);

unsigned int const id = stoul(params().at("id"));
bool const includeExtensions = query().optionalUInt64("include_extensions", 1) != 0;
bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0;
bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0;

debug(__func__, "id=" + to_string(id));
debug(__func__, "include_extensions=" + bool2str(includeExtensions));
debug(__func__, "include_warnings=" + bool2str(includeWarnings));
debug(__func__, "include_retries=" + bool2str(includeRetries));

auto const databaseServices = controller()->serviceProvider()->databaseServices();
auto const contrib = databaseServices->transactionContrib(id, includeWarnings, includeRetries);
auto const contrib =
databaseServices->transactionContrib(id, includeExtensions, includeWarnings, includeRetries);

json result;
result["contribution"] = contrib.toJson();
Expand Down Expand Up @@ -529,9 +586,11 @@ void HttpIngestTransModule::_removePartitionFromDirectorIndex(DatabaseInfo const
}
}

json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const& transaction,
bool longContribFormat, bool includeWarnings,
bool includeRetries) const {
json HttpIngestTransModule::_getTransactionContributions(
TransactionInfo const& transaction, string const& tableName, string const& workerName,
set<TransactionContribInfo::Status> const& contribStatusSelector, int chunkSelector,
bool longContribFormat, bool includeExtensions, bool includeWarnings, bool includeRetries,
size_t minRetries, size_t minWarnings, size_t maxEntries) const {
auto const config = controller()->serviceProvider()->config();
auto const databaseServices = controller()->serviceProvider()->databaseServices();
DatabaseInfo const database = config->databaseInfo(transaction.database);
Expand Down Expand Up @@ -559,14 +618,13 @@ json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const&

// Default selectors for contributions imply pulling all contributions
// attempted in a scope of the transaction.
string const anyTableSelector;
string const anyWorkerSelector;
TransactionContribInfo::TypeSelector const anyTypeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC;

vector<TransactionContribInfo> const contribs = databaseServices->transactionContribs(
transaction.id, anyTableSelector, anyWorkerSelector, anyTypeSelector,
longContribFormat && includeWarnings, longContribFormat && includeRetries);
transaction.id, tableName, workerName, contribStatusSelector, anyTypeSelector, chunkSelector,
longContribFormat && includeExtensions, longContribFormat && includeWarnings,
longContribFormat && includeRetries, minRetries, minWarnings, maxEntries);

for (auto&& contrib : contribs) {
if (longContribFormat) {
Expand Down Expand Up @@ -701,4 +759,43 @@ json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const&
return resultJson;
}

set<TransactionInfo::State> HttpIngestTransModule::_parseTransStateSelector(string const& param) const {
set<TransactionInfo::State> result;
auto const stateStr = query().optionalString(param);
debug(__func__, param + "=" + stateStr);
if (stateStr == "!STARTED") {
result = TransactionInfo::allStates;
result.erase(TransactionInfo::State::STARTED);
} else if (stateStr == "!FINISHED") {
result = TransactionInfo::allStates;
result.erase(TransactionInfo::State::FINISHED);
} else {
bool const skipEmpty = true;
for (auto const& str : util::String::split(stateStr, ",", skipEmpty)) {
result.insert(TransactionInfo::string2state(str));
}
}
return result;
}

set<TransactionContribInfo::Status> HttpIngestTransModule::_parseContribStatusSelector(
string const& param) const {
set<TransactionContribInfo::Status> result;
auto const statusStr = query().optionalString(param);
debug(__func__, param + "=" + statusStr);
if (statusStr == "!IN_PROGRESS") {
result = TransactionContribInfo::allStatuses;
result.erase(TransactionContribInfo::Status::IN_PROGRESS);
} else if (statusStr == "!FINISHED") {
result = TransactionContribInfo::allStatuses;
result.erase(TransactionContribInfo::Status::FINISHED);
} else {
bool const skipEmpty = true;
for (auto const& str : util::String::split(statusStr, ",", skipEmpty)) {
result.insert(TransactionContribInfo::str2status(str));
}
}
return result;
}

} // namespace lsst::qserv::replica
41 changes: 39 additions & 2 deletions src/replica/contr/HttpIngestTransModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

// Qserv headers
#include "replica/contr/HttpModule.h"
#include "replica/ingest/TransactionContrib.h"
#include "replica/services/DatabaseServices.h"
#include "replica/util/Common.h"

// Forward declarations
Expand Down Expand Up @@ -130,18 +132,53 @@ class HttpIngestTransModule : public HttpModule {
/**
* Extract contributions into a transaction.
* @param transaction A transaction defining a scope of the request.
* @param tableName The name of a table to pull contributions from (if empty then all tables will be
* assumed).
* @param workerName The name of a worker to pull contributions from (if empty then all workers will be
* assumed).
* @param contribStatus A set of the contribution statuses to filter the contributions by (all
* contributions if empty).
* @param chunkSelector The chunk selector to filter the contributions by (all contributions if -1).
* @param longContribFormat If 'true' then the method will also return info on
* the individual file contributions rather than just the summary info.
* @param includeExtensions If 'true' then include info on the contributions extensions. Note that
* this option is ignored if longContribFormat == false.
* @param includeWarnings If 'true' then include info on the MySQL warnings
* if any were captured after LOAD DATA INFILE. Note that this option is
* ignored if longContribFormat == false.
* @param includeRetries If 'true' then include info on the failed retries
* if any were made when reading the input data of the contributions. Note that
* this option is ignored if longContribFormat == false.
* @param minRetries The minimum number of retries for a contribution to be included in
* the response (0 for all).
* @param minWarnings The minimum number of warnings for a contribution to include in
* the response (0 for all).
* @param maxEntries The maximum number of contributions to return (0 for all).
* @return A JSON object.
*/
nlohmann::json _getTransactionContributions(TransactionInfo const& transaction, bool longContribFormat,
bool includeWarnings, bool includeRetries) const;
nlohmann::json _getTransactionContributions(TransactionInfo const& transaction,
std::string const& tableName, std::string const& workerName,
std::set<TransactionContribInfo::Status> const& contribStatus,
int chunkSelector, bool longContribFormat,
bool includeExtensions, bool includeWarnings,
bool includeRetries, size_t minRetries = 0,
size_t minWarnings = 0, size_t maxEntries = 0) const;

/**
* Parse a string representation of the transaction state.
* @param param The name of the query parameter to parse.
* @return A set of the transaction states (empty if the parameter is not set).
* @throws std::invalid_argument If the string didn't match any known code.
*/
std::set<TransactionInfo::State> _parseTransStateSelector(std::string const& param) const;

/**
* Parse a string representation of the contribution status.
* @param param The name of the query parameter to parse.
* @return A set of the contribution statuses (empty if the parameter is not set).
* @throws std::invalid_argument If the string didn't match any known code.
*/
std::set<TransactionContribInfo::Status> _parseContribStatusSelector(std::string const& param) const;

/// Named mutexes are used for acquiring exclusive transient locks on the transaction
/// management operations performed by the module.
Expand Down
Loading

0 comments on commit e243699

Please sign in to comment.