diff --git a/src/admin/python/lsst/qserv/admin/replicationInterface.py b/src/admin/python/lsst/qserv/admin/replicationInterface.py index 147ff7a90..02e92cff1 100644 --- a/src/admin/python/lsst/qserv/admin/replicationInterface.py +++ b/src/admin/python/lsst/qserv/admin/replicationInterface.py @@ -201,7 +201,7 @@ def __init__( self.repl_ctrl = urlparse(repl_ctrl_uri) self.auth_key = auth_key self.admin_auth_key = admin_auth_key - self.repl_api_version = 36 + self.repl_api_version = 37 _log.debug(f"ReplicationInterface %s", self.repl_ctrl) def version(self) -> str: diff --git a/src/http/ChttpMetaModule.cc b/src/http/ChttpMetaModule.cc index 19de4dad6..2794a21e0 100644 --- a/src/http/ChttpMetaModule.cc +++ b/src/http/ChttpMetaModule.cc @@ -37,7 +37,7 @@ string const adminAuthKey; namespace lsst::qserv::http { -unsigned int const ChttpMetaModule::version = 36; +unsigned int const ChttpMetaModule::version = 37; void ChttpMetaModule::process(string const& context, nlohmann::json const& info, httplib::Request const& req, httplib::Response& resp, string const& subModuleName) { diff --git a/src/http/MetaModule.cc b/src/http/MetaModule.cc index f7f4f57ac..194f79b39 100644 --- a/src/http/MetaModule.cc +++ b/src/http/MetaModule.cc @@ -37,7 +37,7 @@ string const adminAuthKey; namespace lsst::qserv::http { -unsigned int const MetaModule::version = 36; +unsigned int const MetaModule::version = 37; void MetaModule::process(string const& context, nlohmann::json const& info, shared_ptr const& req, shared_ptr const& resp, diff --git a/src/replica/contr/HttpIngestTransModule.cc b/src/replica/contr/HttpIngestTransModule.cc index 5f34462d4..1140e8202 100644 --- a/src/replica/contr/HttpIngestTransModule.cc +++ b/src/replica/contr/HttpIngestTransModule.cc @@ -38,10 +38,10 @@ #include "replica/jobs/AbortTransactionJob.h" #include "replica/jobs/DirectorIndexJob.h" #include "replica/mysql/DatabaseMySQL.h" -#include "replica/services/DatabaseServices.h" #include "replica/services/ServiceProvider.h" #include "replica/util/Mutex.h" #include "replica/util/NamedMutexRegistry.h" +#include "util/String.h" using namespace std; using json = nlohmann::json; @@ -93,7 +93,7 @@ json HttpIngestTransModule::executeImpl(string const& subModuleName) { json HttpIngestTransModule::_getTransactions() { debug(__func__); - checkApiVersion(__func__, 16); + checkApiVersion(__func__, 37); auto const config = controller()->serviceProvider()->config(); auto const databaseServices = controller()->serviceProvider()->databaseServices(); @@ -106,6 +106,7 @@ json HttpIngestTransModule::_getTransactions() { auto const longContribFormat = query().optionalUInt64("contrib_long", 0) != 0; bool const includeContext = query().optionalUInt64("include_context", 0) != 0; bool const includeLog = query().optionalUInt64("include_log", 0) != 0; + bool const includeExtensions = query().optionalUInt64("include_extensions", 0) != 0; bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0; bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0; @@ -117,9 +118,13 @@ json HttpIngestTransModule::_getTransactions() { debug(__func__, "contrib_long=" + bool2str(longContribFormat)); debug(__func__, "include_context=" + bool2str(includeContext)); debug(__func__, "include_log=" + bool2str(includeLog)); + debug(__func__, "include_extensions=" + bool2str(includeExtensions)); debug(__func__, "include_warnings=" + bool2str(includeWarnings)); debug(__func__, "include_retries=" + bool2str(includeRetries)); + auto const transStateSelector = _parseTransStateSelector("trans_state"); + auto const contribStatusSelector = _parseContribStatusSelector("contrib_status"); + vector databases; if (databaseName.empty()) { databases = config->databases(family, allDatabases, isPublished); @@ -127,7 +132,10 @@ json HttpIngestTransModule::_getTransactions() { databases.push_back(databaseName); } + string const anyTableSelector; + string const anyWorkerSelector; bool const allWorkers = true; + int const anyChunkSelector = -1; json result; result["databases"] = json::object(); for (auto&& databaseName : databases) { @@ -138,11 +146,14 @@ json HttpIngestTransModule::_getTransactions() { result["databases"][database.name]["is_published"] = database.isPublished ? 1 : 0; result["databases"][database.name]["num_chunks"] = chunks.size(); result["databases"][database.name]["transactions"] = json::array(); - for (auto&& transaction : databaseServices->transactions(database.name, includeContext, includeLog)) { + for (auto&& transaction : + databaseServices->transactions(database.name, includeContext, includeLog, transStateSelector)) { json transJson = transaction.toJson(); if (includeContributions) { - transJson["contrib"] = _getTransactionContributions(transaction, longContribFormat, - includeWarnings, includeRetries); + transJson["contrib"] = _getTransactionContributions( + transaction, anyTableSelector, anyWorkerSelector, contribStatusSelector, + anyChunkSelector, longContribFormat, includeExtensions, includeWarnings, + includeRetries); } result["databases"][database.name]["transactions"].push_back(transJson); } @@ -152,41 +163,84 @@ json HttpIngestTransModule::_getTransactions() { json HttpIngestTransModule::_getTransaction() { debug(__func__); - checkApiVersion(__func__, 16); + checkApiVersion(__func__, 37); auto const config = controller()->serviceProvider()->config(); auto const databaseServices = controller()->serviceProvider()->databaseServices(); - auto const id = stoul(params().at("id")); + TransactionId const transactionId = stoul(params().at("id")); + auto const databaseName = query().optionalString("database"); + auto const tableName = query().optionalString("table"); + auto const workerName = query().optionalString("worker"); + int const chunkSelector = query().optionalInt("chunk", -1); auto const includeContributions = query().optionalUInt64("contrib", 0) != 0; auto const longContribFormat = query().optionalUInt64("contrib_long", 0) != 0; bool const includeContext = query().optionalUInt64("include_context", 0) != 0; bool const includeLog = query().optionalUInt64("include_log", 0) != 0; + bool const includeExtensions = query().optionalUInt64("include_extensions", 0) != 0; bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0; bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0; + size_t const minRetries = query().optionalUInt64("min_retries", 0); + size_t const minWarnings = query().optionalUInt64("min_warnings", 0); + size_t const maxEntries = query().optionalUInt64("max_entries", 0); - debug(__func__, "id=" + to_string(id)); + debug(__func__, "id=" + to_string(transactionId)); + debug(__func__, "database=" + databaseName); + debug(__func__, "table=" + tableName); + debug(__func__, "worker=" + workerName); + debug(__func__, "chunk=" + to_string(chunkSelector)); debug(__func__, "contrib=" + bool2str(includeContributions)); debug(__func__, "contrib_long=" + bool2str(longContribFormat)); debug(__func__, "include_context=" + bool2str(includeContext)); debug(__func__, "include_log=" + bool2str(includeLog)); + debug(__func__, "include_extensions=" + bool2str(includeExtensions)); debug(__func__, "include_warnings=" + bool2str(includeWarnings)); debug(__func__, "include_retries=" + bool2str(includeRetries)); + debug(__func__, "min_retries=" + to_string(minRetries)); + debug(__func__, "min_warnings=" + to_string(minWarnings)); + debug(__func__, "max_entries=" + to_string(maxEntries)); + + auto const transStateSelector = _parseTransStateSelector("trans_state"); + auto const contribStatusSelector = _parseContribStatusSelector("contrib_status"); + + if (databaseName.empty() && (transactionId == 0)) { + throw http::Error(__func__, "either 'id' or 'database' query parameter must be specified"); + } + + DatabaseInfo database; + vector transactions; + if (transactionId != 0) { + auto transaction = databaseServices->transaction(transactionId, includeContext, includeLog); + database = config->databaseInfo(transaction.database); + if (!databaseName.empty() && (databaseName != database.name)) { + throw http::Error(__func__, "transaction id=" + to_string(transactionId) + + " is associated with database '" + database.name + + "' which is different from the requested database '" + + databaseName + "'"); + } + transactions.push_back(move(transaction)); + } else { + database = config->databaseInfo(databaseName); + transactions = + databaseServices->transactions(database.name, includeContext, includeLog, transStateSelector); + } - auto const transaction = databaseServices->transaction(id, includeContext, includeLog); - auto const database = config->databaseInfo(transaction.database); bool const allWorkers = true; vector chunks; - databaseServices->findDatabaseChunks(chunks, transaction.database, allWorkers); + databaseServices->findDatabaseChunks(chunks, database.name, allWorkers); - json transJson = transaction.toJson(); - if (includeContributions) { - transJson["contrib"] = - _getTransactionContributions(transaction, longContribFormat, includeWarnings, includeRetries); - } json result; - result["databases"][transaction.database]["is_published"] = database.isPublished ? 1 : 0; - result["databases"][transaction.database]["num_chunks"] = chunks.size(); - result["databases"][transaction.database]["transactions"].push_back(transJson); + result["databases"][database.name]["is_published"] = database.isPublished ? 1 : 0; + result["databases"][database.name]["num_chunks"] = chunks.size(); + for (auto&& transaction : transactions) { + json transJson = transaction.toJson(); + if (includeContributions) { + transJson["contrib"] = _getTransactionContributions( + transaction, tableName, workerName, contribStatusSelector, chunkSelector, + longContribFormat, includeExtensions, includeWarnings, includeRetries, minRetries, + minWarnings, maxEntries); + } + result["databases"][database.name]["transactions"].push_back(transJson); + } return result; } @@ -452,18 +506,21 @@ json HttpIngestTransModule::_endTransaction() { json HttpIngestTransModule::_getContribution() { debug(__func__); - checkApiVersion(__func__, 16); + checkApiVersion(__func__, 37); unsigned int const id = stoul(params().at("id")); + bool const includeExtensions = query().optionalUInt64("include_extensions", 1) != 0; bool const includeWarnings = query().optionalUInt64("include_warnings", 0) != 0; bool const includeRetries = query().optionalUInt64("include_retries", 0) != 0; debug(__func__, "id=" + to_string(id)); + debug(__func__, "include_extensions=" + bool2str(includeExtensions)); debug(__func__, "include_warnings=" + bool2str(includeWarnings)); debug(__func__, "include_retries=" + bool2str(includeRetries)); auto const databaseServices = controller()->serviceProvider()->databaseServices(); - auto const contrib = databaseServices->transactionContrib(id, includeWarnings, includeRetries); + auto const contrib = + databaseServices->transactionContrib(id, includeExtensions, includeWarnings, includeRetries); json result; result["contribution"] = contrib.toJson(); @@ -529,9 +586,11 @@ void HttpIngestTransModule::_removePartitionFromDirectorIndex(DatabaseInfo const } } -json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const& transaction, - bool longContribFormat, bool includeWarnings, - bool includeRetries) const { +json HttpIngestTransModule::_getTransactionContributions( + TransactionInfo const& transaction, string const& tableName, string const& workerName, + set const& contribStatusSelector, int chunkSelector, + bool longContribFormat, bool includeExtensions, bool includeWarnings, bool includeRetries, + size_t minRetries, size_t minWarnings, size_t maxEntries) const { auto const config = controller()->serviceProvider()->config(); auto const databaseServices = controller()->serviceProvider()->databaseServices(); DatabaseInfo const database = config->databaseInfo(transaction.database); @@ -559,14 +618,13 @@ json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const& // Default selectors for contributions imply pulling all contributions // attempted in a scope of the transaction. - string const anyTableSelector; - string const anyWorkerSelector; TransactionContribInfo::TypeSelector const anyTypeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC; vector const contribs = databaseServices->transactionContribs( - transaction.id, anyTableSelector, anyWorkerSelector, anyTypeSelector, - longContribFormat && includeWarnings, longContribFormat && includeRetries); + transaction.id, tableName, workerName, contribStatusSelector, anyTypeSelector, chunkSelector, + longContribFormat && includeExtensions, longContribFormat && includeWarnings, + longContribFormat && includeRetries, minRetries, minWarnings, maxEntries); for (auto&& contrib : contribs) { if (longContribFormat) { @@ -701,4 +759,43 @@ json HttpIngestTransModule::_getTransactionContributions(TransactionInfo const& return resultJson; } +set HttpIngestTransModule::_parseTransStateSelector(string const& param) const { + set result; + auto const stateStr = query().optionalString(param); + debug(__func__, param + "=" + stateStr); + if (stateStr == "!STARTED") { + result = TransactionInfo::allStates; + result.erase(TransactionInfo::State::STARTED); + } else if (stateStr == "!FINISHED") { + result = TransactionInfo::allStates; + result.erase(TransactionInfo::State::FINISHED); + } else { + bool const skipEmpty = true; + for (auto const& str : util::String::split(stateStr, ",", skipEmpty)) { + result.insert(TransactionInfo::string2state(str)); + } + } + return result; +} + +set HttpIngestTransModule::_parseContribStatusSelector( + string const& param) const { + set result; + auto const statusStr = query().optionalString(param); + debug(__func__, param + "=" + statusStr); + if (statusStr == "!IN_PROGRESS") { + result = TransactionContribInfo::allStatuses; + result.erase(TransactionContribInfo::Status::IN_PROGRESS); + } else if (statusStr == "!FINISHED") { + result = TransactionContribInfo::allStatuses; + result.erase(TransactionContribInfo::Status::FINISHED); + } else { + bool const skipEmpty = true; + for (auto const& str : util::String::split(statusStr, ",", skipEmpty)) { + result.insert(TransactionContribInfo::str2status(str)); + } + } + return result; +} + } // namespace lsst::qserv::replica diff --git a/src/replica/contr/HttpIngestTransModule.h b/src/replica/contr/HttpIngestTransModule.h index 948a931f1..8d94fb646 100644 --- a/src/replica/contr/HttpIngestTransModule.h +++ b/src/replica/contr/HttpIngestTransModule.h @@ -30,6 +30,8 @@ // Qserv headers #include "replica/contr/HttpModule.h" +#include "replica/ingest/TransactionContrib.h" +#include "replica/services/DatabaseServices.h" #include "replica/util/Common.h" // Forward declarations @@ -130,18 +132,53 @@ class HttpIngestTransModule : public HttpModule { /** * Extract contributions into a transaction. * @param transaction A transaction defining a scope of the request. + * @param tableName The name of a table to pull contributions from (if empty then all tables will be + * assumed). + * @param workerName The name of a worker to pull contributions from (if empty then all workers will be + * assumed). + * @param contribStatus A set of the contribution statuses to filter the contributions by (all + * contributions if empty). + * @param chunkSelector The chunk selector to filter the contributions by (all contributions if -1). * @param longContribFormat If 'true' then the method will also return info on * the individual file contributions rather than just the summary info. + * @param includeExtensions If 'true' then include info on the contributions extensions. Note that + * this option is ignored if longContribFormat == false. * @param includeWarnings If 'true' then include info on the MySQL warnings * if any were captured after LOAD DATA INFILE. Note that this option is * ignored if longContribFormat == false. * @param includeRetries If 'true' then include info on the failed retries * if any were made when reading the input data of the contributions. Note that * this option is ignored if longContribFormat == false. + * @param minRetries The minimum number of retries for a contribution to be included in + * the response (0 for all). + * @param minWarnings The minimum number of warnings for a contribution to include in + * the response (0 for all). + * @param maxEntries The maximum number of contributions to return (0 for all). * @return A JSON object. */ - nlohmann::json _getTransactionContributions(TransactionInfo const& transaction, bool longContribFormat, - bool includeWarnings, bool includeRetries) const; + nlohmann::json _getTransactionContributions(TransactionInfo const& transaction, + std::string const& tableName, std::string const& workerName, + std::set const& contribStatus, + int chunkSelector, bool longContribFormat, + bool includeExtensions, bool includeWarnings, + bool includeRetries, size_t minRetries = 0, + size_t minWarnings = 0, size_t maxEntries = 0) const; + + /** + * Parse a string representation of the transaction state. + * @param param The name of the query parameter to parse. + * @return A set of the transaction states (empty if the parameter is not set). + * @throws std::invalid_argument If the string didn't match any known code. + */ + std::set _parseTransStateSelector(std::string const& param) const; + + /** + * Parse a string representation of the contribution status. + * @param param The name of the query parameter to parse. + * @return A set of the contribution statuses (empty if the parameter is not set). + * @throws std::invalid_argument If the string didn't match any known code. + */ + std::set _parseContribStatusSelector(std::string const& param) const; /// Named mutexes are used for acquiring exclusive transient locks on the transaction /// management operations performed by the module. diff --git a/src/replica/ingest/IngestHttpSvcMod.cc b/src/replica/ingest/IngestHttpSvcMod.cc index bd6ab43f0..a922d89dd 100644 --- a/src/replica/ingest/IngestHttpSvcMod.cc +++ b/src/replica/ingest/IngestHttpSvcMod.cc @@ -140,8 +140,9 @@ json IngestHttpSvcMod::_asyncTransRequests() const { TransactionId const transactionId = stoul(params().at("id")); string const anyTable; + set const anyStatus; auto const contribs = _serviceProvider->databaseServices()->transactionContribs( - transactionId, anyTable, _workerName, TransactionContribInfo::TypeSelector::ASYNC); + transactionId, anyTable, _workerName, anyStatus, TransactionContribInfo::TypeSelector::ASYNC); json contribsJson = json::array(); for (auto& contrib : contribs) { contribsJson.push_back(contrib.toJson()); @@ -155,8 +156,9 @@ json IngestHttpSvcMod::_asyncTransCancelRequests() const { TransactionId const transactionId = stoul(params().at("id")); string const anyTable; + set const anyStatus; auto const contribs = _serviceProvider->databaseServices()->transactionContribs( - transactionId, anyTable, _workerName, TransactionContribInfo::TypeSelector::ASYNC); + transactionId, anyTable, _workerName, anyStatus, TransactionContribInfo::TypeSelector::ASYNC); json contribsJson = json::array(); for (auto& contrib : contribs) { try { diff --git a/src/replica/ingest/IngestRequestMgr.cc b/src/replica/ingest/IngestRequestMgr.cc index 798e2883b..b35d247a5 100644 --- a/src/replica/ingest/IngestRequestMgr.cc +++ b/src/replica/ingest/IngestRequestMgr.cc @@ -74,7 +74,7 @@ shared_ptr IngestRequestMgr::create(shared_ptrtransactions(TransactionInfo::State::STARTED); for (auto const& trans : transactions) { auto const contribs = databaseServices->transactionContribs( - trans.id, TransactionContribInfo::Status::IN_PROGRESS, anyTable, workerName, + trans.id, anyTable, workerName, {TransactionContribInfo::Status::IN_PROGRESS}, TransactionContribInfo::TypeSelector::ASYNC); contribsByCreateTimeDESC.insert(contribsByCreateTimeDESC.end(), contribs.cbegin(), contribs.cend()); } diff --git a/src/replica/ingest/TransactionContrib.cc b/src/replica/ingest/TransactionContrib.cc index e623af074..43ec21b4b 100644 --- a/src/replica/ingest/TransactionContrib.cc +++ b/src/replica/ingest/TransactionContrib.cc @@ -71,7 +71,21 @@ vector const TransactionContribInfo::_transactio TransactionContribInfo::Status::LOAD_FAILED, TransactionContribInfo::Status::CANCELLED, TransactionContribInfo::Status::FINISHED}; -string const& TransactionContribInfo::status2str(TransactionContribInfo::Status status) { +set const TransactionContribInfo::allStatuses = { + TransactionContribInfo::Status::IN_PROGRESS, TransactionContribInfo::Status::CREATE_FAILED, + TransactionContribInfo::Status::START_FAILED, TransactionContribInfo::Status::READ_FAILED, + TransactionContribInfo::Status::LOAD_FAILED, TransactionContribInfo::Status::CANCELLED, + TransactionContribInfo::Status::FINISHED}; + +set TransactionContribInfo::toStrings(set const& coll) { + set result; + for (auto const& s : coll) { + result.insert(status2str(s)); + } + return result; +} + +string const& TransactionContribInfo::status2str(Status status) { auto itr = _transactionContribStatus2str.find(status); if (itr == _transactionContribStatus2str.cend()) { throw invalid_argument("DatabaseServices::" + string(__func__) + diff --git a/src/replica/ingest/TransactionContrib.h b/src/replica/ingest/TransactionContrib.h index 07806ac9c..ff3fc5eda 100644 --- a/src/replica/ingest/TransactionContrib.h +++ b/src/replica/ingest/TransactionContrib.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -77,7 +78,7 @@ class TransactionContribInfo { /// The type selector is used in the where the tri-state is required. enum class TypeSelector : int { SYNC, ASYNC, SYNC_OR_ASYNC }; - /// @return The string representation of teh selector. + /// @return The string representation of the selector. static std::string typeSelector2str(TypeSelector typeSelector); bool async = false; ///< The type of the request @@ -200,6 +201,10 @@ class TransactionContribInfo { FINISHED // The request succeeded } status; + /// The collection of all known status codes if a client needs to iterate over them. + static std::set const allStatuses; + static std::set toStrings(std::set const& coll); + /// The temporary file that was created to store pre-processed content of the input /// file before ingesting it into MySQL. The file is supposed to be deleted after finishing /// ingesting the contribution or in case of any failures. Though, in some failure modes diff --git a/src/replica/services/DatabaseServices.cc b/src/replica/services/DatabaseServices.cc index aca180348..d97f56cc3 100644 --- a/src/replica/services/DatabaseServices.cc +++ b/src/replica/services/DatabaseServices.cc @@ -128,6 +128,20 @@ json JobInfo::toJson() const { return info; } +set const TransactionInfo::allStates = { + TransactionInfo::State::IS_STARTING, TransactionInfo::State::STARTED, + TransactionInfo::State::IS_FINISHING, TransactionInfo::State::IS_ABORTING, + TransactionInfo::State::FINISHED, TransactionInfo::State::ABORTED, + TransactionInfo::State::START_FAILED, TransactionInfo::State::FINISH_FAILED, + TransactionInfo::State::ABORT_FAILED}; + +set TransactionInfo::toStrings(set const& coll) { + set result; + for (auto const& s : coll) { + result.insert(state2string(s)); + } + return result; +} TransactionInfo::State TransactionInfo::string2state(string const& str) { if ("IS_STARTING" == str) return State::IS_STARTING; if ("STARTED" == str) return State::STARTED; @@ -138,8 +152,8 @@ TransactionInfo::State TransactionInfo::string2state(string const& str) { if ("START_FAILED" == str) return State::START_FAILED; if ("FINISH_FAILED" == str) return State::FINISH_FAILED; if ("ABORT_FAILED" == str) return State::ABORT_FAILED; - throw runtime_error("DatabaseServices::" + string(__func__) + " unknown transaction state: '" + str + - "'"); + throw invalid_argument("DatabaseServices::" + string(__func__) + " unknown transaction state: '" + str + + "'"); } string TransactionInfo::state2string(State state) { diff --git a/src/replica/services/DatabaseServices.h b/src/replica/services/DatabaseServices.h index 70b8b011b..b169318fe 100644 --- a/src/replica/services/DatabaseServices.h +++ b/src/replica/services/DatabaseServices.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -213,7 +214,13 @@ class TransactionInfo { ABORT_FAILED ///< the failed (inactive) state, next states: (IS_ABORTING) }; + /// The collection of all known state codes if a client needs to iterate over them. + static std::set const allStates; + static std::set toStrings(std::set const& coll); + + /// @throws std::invalid_argument If the string doesn't match any known state code. static State string2state(std::string const& str); + static std::string state2string(State state); /// @brief Verify if the proposed state transition is possible. @@ -843,11 +850,13 @@ class DatabaseServices : public std::enable_shared_from_this { /// @param databaseName (optional) the name of a database /// @param includeContext (optional) flag that (if 'true') would pull the transaction context /// @param includeLog (optional) flag that (if 'true') would pull the transaction log (events) + /// @param stateSelector (optional) a collection of the desired states of the transactions (all states if + /// empty) /// @return a collection of super-transactions (all of them or for the specified database only) /// @throws std::invalid_argument if database name is not valid - virtual std::vector transactions(std::string const& databaseName = std::string(), - bool includeContext = false, - bool includeLog = false) = 0; + virtual std::vector transactions( + std::string const& databaseName = std::string(), bool includeContext = false, + bool includeLog = false, std::set const& stateSelector = {}) = 0; /// @param state the desired state of the transactions /// @param includeContext (optional) flag that (if 'true') would pull the transacion context @@ -928,54 +937,63 @@ class DatabaseServices : public std::enable_shared_from_this { /// @return the desired contribution into a super-transaction (if found) /// @param id a unique identifier of the contribution + /// @param includeExtensions if 'true' then include info on the contributions extensions (see schema) /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE /// @param includeRetries if 'true' then include info on the failed retries to pull the input data /// @throws DatabaseServicesNotFound if no contribution was found for the specified identifier - virtual TransactionContribInfo transactionContrib(unsigned int id, bool includeWarnings = false, + virtual TransactionContribInfo transactionContrib(unsigned int id, bool includeExtensions = false, + bool includeWarnings = false, bool includeRetries = false) = 0; /// @return contributions into a super-transaction for the given selectors - /// @param transactionId a unique identifier of the transaction - /// @param table (optional) the base name of a table (all tables if not provided) - /// @param workerName (optional) the name of a worker (all workers if not provided) - /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data + /// @param transactionId a unique identifier of the transaction (all transactions if 0) + /// @param table the base name of a table (all tables if empty) + /// @param workerName the name of a worker (all workers if empty) + /// @param statusSelector (optional) the desired status of the contributions (all statuses if empty) + /// @param typeSelector (optional) type of the contributions (all types if empty) + /// @param chunkSelector (optional) the desired chunk number of the contributions (all chunks if -1) + /// @param includeExtensions if 'true' then include info on the contributions extensions (see schema) + /// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA + /// INFILE + /// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input + /// data + /// @param minRetries (optional) the minimum number of retries for a contribution to be reported (no limit + /// if 0) + /// @param minWarnings (optional) the minimum number of warnings for a contribution to be reported (no + /// limit if 0) + /// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0). virtual std::vector transactionContribs( - TransactionId transactionId, std::string const& table = std::string(), - std::string const& workerName = std::string(), + TransactionId transactionId, std::string const& table, std::string const& workerName, + std::set const& statusSelector = {}, TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; - - /// @return contributions into a super-transaction for the given selectors - /// @param transactionId a unique identifier of the transaction - /// @param status the desired status of the contributions - /// @param table (optional) the base name of a table (all tables if not provided) - /// @param workerName (optional) the name of a worker (all workers if not provided) - /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data - virtual std::vector transactionContribs( - TransactionId transactionId, TransactionContribInfo::Status status, - std::string const& table = std::string(), std::string const& workerName = std::string(), - TransactionContribInfo::TypeSelector typeSelector = - TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; + int chunkSelector = -1, bool includeExtensions = false, bool includeWarnings = false, + bool includeRetries = false, size_t minRetries = 0, size_t minWarnings = 0, + size_t maxEntries = 0) = 0; /// @return contributions into super-transactions for the given selectors /// @param database the name of a database - /// @param table (optional) the base name of a table (all tables if not provided) - /// @param workerName (optional) the name of a worker (all workers if not provided) - /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data + /// @param table the base name of a table (all tables if empty) + /// @param workerName the name of a worker (all workers if empty) + /// @param statusSelector (optional) the desired status of the contributions (all statuses if empty) + /// @param typeSelector (optional) type of the contributions (all types if empty) + /// @param includeExtensions if 'true' then include info on the contributions extensions (see schema) + /// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA + /// INFILE + /// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input + /// data + /// @param minRetries (optional) the minimum number of retries for a contribution to be reported (no limit + /// if 0) + /// @param minWarnings (optional) the minimum number of warnings for a contribution to be reported (no + /// limit if 0) + /// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0). virtual std::vector transactionContribs( - std::string const& database, std::string const& table = std::string(), - std::string const& workerName = std::string(), + std::string const& database, std::string const& table, std::string const& workerName, + std::set const& statusSelector = {}, TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; + bool includeExtensions = false, bool includeWarnings = false, bool includeRetries = false, + size_t minRetries = 0, size_t minWarnings = 0, size_t maxEntries = 0) = 0; /** * Insert the initial record on the contribution. diff --git a/src/replica/services/DatabaseServicesMySQL.cc b/src/replica/services/DatabaseServicesMySQL.cc index 813e85d7c..1b4f05531 100644 --- a/src/replica/services/DatabaseServicesMySQL.cc +++ b/src/replica/services/DatabaseServicesMySQL.cc @@ -1315,13 +1315,15 @@ TransactionInfo DatabaseServicesMySQL::transaction(TransactionId id, bool includ return info; } -vector DatabaseServicesMySQL::transactions(string const& databaseName, bool includeContext, - bool includeLog) { +vector DatabaseServicesMySQL::transactions( + string const& databaseName, bool includeContext, bool includeLog, + set const& stateSelector) { string const context = _context(__func__) + "databaseName=" + databaseName + " "; LOGS(_log, LOG_LVL_DEBUG, context); _assertDatabaseIsValid(context, databaseName); + string const predicate = _g.packConds(databaseName.empty() ? "" : _g.eq("database", databaseName), + _g.in("state", TransactionInfo::toStrings(stateSelector))); replica::Lock lock(_mtx, context); - string const predicate = databaseName.empty() ? "" : _g.eq("database", databaseName); return _transactions(lock, predicate, includeContext, includeLog); } @@ -1585,15 +1587,16 @@ vector DatabaseServicesMySQL::_findTransactionsImpl(replica::Lo return collection; } -TransactionContribInfo DatabaseServicesMySQL::transactionContrib(unsigned int id, bool includeWarnings, - bool includeRetries) { +TransactionContribInfo DatabaseServicesMySQL::transactionContrib(unsigned int id, bool includeExtensions, + bool includeWarnings, bool includeRetries) { string const context = _context(__func__) + "id=" + to_string(id) + " "; LOGS(_log, LOG_LVL_DEBUG, context); vector collection; replica::Lock lock(_mtx, context); try { _conn->executeInOwnTransaction([&](decltype(_conn) conn) { - collection = _transactionContribsImpl(lock, _g.eq("id", id), includeWarnings, includeRetries); + collection = _transactionContribsImpl(lock, _g.eq("id", id), includeExtensions, includeWarnings, + includeRetries); }); } catch (exception const& ex) { LOGS(_log, LOG_LVL_ERROR, context << "failed, exception: " << ex.what()); @@ -1620,57 +1623,51 @@ string DatabaseServicesMySQL::_typeSelectorPredicate( case TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC: return string(); } - return string(); + throw runtime_error("DatabaseServicesMySQL::" + string(__func__) + " unhandled type selector " + + to_string(static_cast(typeSelector))); } vector DatabaseServicesMySQL::transactionContribs( TransactionId transactionId, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + set const& statusSelector, + TransactionContribInfo::TypeSelector typeSelector, int chunkSelector, bool includeExtensions, + bool includeWarnings, bool includeRetries, size_t minRetries, size_t minWarnings, size_t maxEntries) { string const context = _context(__func__) + "transactionId=" + to_string(transactionId) + " table=" + table + " worker=" + workerName + " " + " typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " "; LOGS(_log, LOG_LVL_DEBUG, context); - replica::Lock lock(_mtx, context); - string const predicate = _g.packConds( - _g.eq("transaction_id", transactionId), table.empty() ? "" : _g.eq("table", table), - workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); -} - -vector DatabaseServicesMySQL::transactionContribs( - TransactionId transactionId, TransactionContribInfo::Status status, string const& table, - string const& workerName, TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, - bool includeRetries) { - string const context = _context(__func__) + "transactionId=" + to_string(transactionId) + - " status=" + TransactionContribInfo::status2str(status) + " table=" + table + - " worker=" + workerName + " " + - " typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " "; - LOGS(_log, LOG_LVL_DEBUG, context); + transactionId == 0 ? "" : _g.eq("transaction_id", transactionId), + table.empty() ? "" : _g.eq("table", table), workerName.empty() ? "" : _g.eq("worker", workerName), + _g.in("status", TransactionContribInfo::toStrings(statusSelector)), + _typeSelectorPredicate(typeSelector), chunkSelector < 0 ? "" : _g.eq("chunk", chunkSelector), + minRetries == 0 ? "" : _g.geq("num_failed_retries", minRetries), + minWarnings == 0 ? "" : _g.geq("num_warnings", minWarnings)); replica::Lock lock(_mtx, context); - string const predicate = _g.packConds(_g.eq("transaction_id", transactionId), - _g.eq("status", TransactionContribInfo::status2str(status)), - table.empty() ? "" : _g.eq("table", table), - workerName.empty() ? "" : _g.eq("worker", workerName), - _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); + return _transactionContribs(lock, predicate, includeExtensions, includeWarnings, includeRetries, + maxEntries); } vector DatabaseServicesMySQL::transactionContribs( string const& database, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + set const& statusSelector, + TransactionContribInfo::TypeSelector typeSelector, bool includeExtensions, bool includeWarnings, + bool includeRetries, size_t minRetries, size_t minWarnings, size_t maxEntries) { string const context = _context(__func__) + "database=" + database + " table=" + table + " worker=" + workerName + " " + " typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " "; LOGS(_log, LOG_LVL_DEBUG, context); - _assertDatabaseIsValid(context, database); - + string const predicate = + _g.packConds(_g.eq("database", database), table.empty() ? "" : _g.eq("table", table), + workerName.empty() ? "" : _g.eq("worker", workerName), + _g.in("status", TransactionContribInfo::toStrings(statusSelector)), + _typeSelectorPredicate(typeSelector), + minRetries == 0 ? "" : _g.geq("num_failed_retries", minRetries), + minWarnings == 0 ? "" : _g.geq("num_warnings", minWarnings)); replica::Lock lock(_mtx, context); - string const predicate = _g.packConds( - _g.eq("database", database), table.empty() ? "" : _g.eq("table", table), - workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); + return _transactionContribs(lock, predicate, includeExtensions, includeWarnings, includeRetries, + maxEntries); } TransactionContribInfo DatabaseServicesMySQL::createdTransactionContrib( @@ -1748,6 +1745,7 @@ TransactionContribInfo DatabaseServicesMySQL::updateTransactionContrib(Transacti string const predicate = _g.eq("id", info.id); // These flags should be set to retain the full state of the contribution descriptor // upter reloading it from the database. + bool const includeExtensions = true; bool const includeWarnings = true; bool const includeRetries = true; TransactionContribInfo updatedInfo; @@ -1788,7 +1786,8 @@ TransactionContribInfo DatabaseServicesMySQL::updateTransactionContrib(Transacti for (auto&& query : queries) { conn->execute(query); } - updatedInfo = _transactionContribImpl(lock, predicate, includeWarnings, includeRetries); + updatedInfo = _transactionContribImpl(lock, predicate, includeExtensions, includeWarnings, + includeRetries); }); } catch (exception const& ex) { LOGS(_log, LOG_LVL_ERROR, context << "failed, exception: " << ex.what()); @@ -1808,6 +1807,7 @@ TransactionContribInfo DatabaseServicesMySQL::saveLastTransactionContribRetry( auto const& lastRetry = info.failedRetries.back(); // These flags should be set to retain the full state of the contribution descriptor // upter reloading it from the database. + bool const includeExtensions = true; bool const includeWarnings = true; bool const includeRetries = true; TransactionContribInfo updatedInfo; @@ -1826,7 +1826,8 @@ TransactionContribInfo DatabaseServicesMySQL::saveLastTransactionContribRetry( for (auto&& query : queries) { conn->execute(query); } - updatedInfo = _transactionContribImpl(lock, predicate, includeWarnings, includeRetries); + updatedInfo = _transactionContribImpl(lock, predicate, includeExtensions, includeWarnings, + includeRetries); }); } catch (exception const& ex) { LOGS(_log, LOG_LVL_ERROR, context << "failed, exception: " << ex.what()); @@ -1836,16 +1837,17 @@ TransactionContribInfo DatabaseServicesMySQL::saveLastTransactionContribRetry( return updatedInfo; } -vector DatabaseServicesMySQL::_transactionContribs(replica::Lock const& lock, - string const& predicate, - bool includeWarnings, - bool includeRetries) { - string const context = _context(__func__) + "predicate=" + predicate + " "; +vector DatabaseServicesMySQL::_transactionContribs( + replica::Lock const& lock, string const& predicate, bool includeExtensions, bool includeWarnings, + bool includeRetries, size_t maxEntries) { + string const context = + _context(__func__) + "predicate=" + predicate + " maxEntries=" + to_string(maxEntries) + " "; LOGS(_log, LOG_LVL_DEBUG, context); vector collection; try { _conn->executeInOwnTransaction([&](decltype(_conn) conn) { - collection = _transactionContribsImpl(lock, predicate, includeWarnings, includeRetries); + collection = _transactionContribsImpl(lock, predicate, includeExtensions, includeWarnings, + includeRetries, maxEntries); }); } catch (exception const& ex) { LOGS(_log, LOG_LVL_ERROR, context << "failed, exception: " << ex.what()); @@ -1857,25 +1859,27 @@ vector DatabaseServicesMySQL::_transactionContribs(repli TransactionContribInfo DatabaseServicesMySQL::_transactionContribImpl(replica::Lock const& lock, string const& predicate, + bool includeExtensions, bool includeWarnings, bool includeRetries) { string const context = _context(__func__) + "predicate=" + predicate + " "; - auto const collection = _transactionContribsImpl(lock, predicate, includeWarnings, includeRetries); + auto const collection = + _transactionContribsImpl(lock, predicate, includeExtensions, includeWarnings, includeRetries); size_t const num = collection.size(); if (num == 1) return collection[0]; if (num == 0) throw DatabaseServicesNotFound(context + "no such transaction contribution"); throw DatabaseServicesError(context + "two many transaction contributions found: " + to_string(num)); } -vector DatabaseServicesMySQL::_transactionContribsImpl(replica::Lock const& lock, - string const& predicate, - bool includeWarnings, - bool includeRetries) { +vector DatabaseServicesMySQL::_transactionContribsImpl( + replica::Lock const& lock, string const& predicate, bool includeExtensions, bool includeWarnings, + bool includeRetries, size_t maxEntries) { string const context = _context(__func__) + "predicate=" + predicate + " "; LOGS(_log, LOG_LVL_DEBUG, context); vector collection; - string const query = _g.select(Sql::STAR) + _g.from("transaction_contrib") + _g.where(predicate); + string const query = _g.select(Sql::STAR) + _g.from("transaction_contrib") + _g.where(predicate) + + _g.limit(maxEntries); _conn->execute(query); if (_conn->hasResult()) { database::mysql::Row row; @@ -1913,40 +1917,42 @@ vector DatabaseServicesMySQL::_transactionContribsImpl(r collection.push_back(info); } } - for (auto& contrib : collection) { - string const query = _g.select("key", "val") + _g.from("transaction_contrib_ext") + - _g.where(_g.eq("contrib_id", contrib.id)); - _conn->execute(query); - if (_conn->hasResult()) { - database::mysql::Row row; - while (_conn->next(row)) { - string key; - row.get("key", key); - if (key.empty()) continue; - string val; - row.get("val", val); - if (val.empty()) continue; - if (key == "max_num_warnings") - contrib.maxNumWarnings = lsst::qserv::stoui(val); - else if (key == "fields_terminated_by") - contrib.dialectInput.fieldsTerminatedBy = val; - else if (key == "fields_enclosed_by") - contrib.dialectInput.fieldsEnclosedBy = val; - else if (key == "fields_escaped_by") - contrib.dialectInput.fieldsEscapedBy = val; - else if (key == "lines_terminated_by") - contrib.dialectInput.linesTerminatedBy = val; - else if (key == "http_method") - contrib.httpMethod = http::string2method(val); - else if (key == "http_data") - contrib.httpData = val; - else if (key == "http_headers") - contrib.httpHeaders.emplace_back(val); - else if (key == "charset_name") - contrib.charsetName = val; - else { - throw DatabaseServicesError(context + "unexpected extended parameter '" + key + - "' for contribution id=" + to_string(contrib.id)); + if (includeExtensions) { + for (auto& contrib : collection) { + string const query = _g.select("key", "val") + _g.from("transaction_contrib_ext") + + _g.where(_g.eq("contrib_id", contrib.id)); + _conn->execute(query); + if (_conn->hasResult()) { + database::mysql::Row row; + while (_conn->next(row)) { + string key; + row.get("key", key); + if (key.empty()) continue; + string val; + row.get("val", val); + if (val.empty()) continue; + if (key == "max_num_warnings") + contrib.maxNumWarnings = lsst::qserv::stoui(val); + else if (key == "fields_terminated_by") + contrib.dialectInput.fieldsTerminatedBy = val; + else if (key == "fields_enclosed_by") + contrib.dialectInput.fieldsEnclosedBy = val; + else if (key == "fields_escaped_by") + contrib.dialectInput.fieldsEscapedBy = val; + else if (key == "lines_terminated_by") + contrib.dialectInput.linesTerminatedBy = val; + else if (key == "http_method") + contrib.httpMethod = http::string2method(val); + else if (key == "http_data") + contrib.httpData = val; + else if (key == "http_headers") + contrib.httpHeaders.emplace_back(val); + else if (key == "charset_name") + contrib.charsetName = val; + else { + throw DatabaseServicesError(context + "unexpected extended parameter '" + key + + "' for contribution id=" + to_string(contrib.id)); + } } } } diff --git a/src/replica/services/DatabaseServicesMySQL.h b/src/replica/services/DatabaseServicesMySQL.h index 1e6b3c31a..f7a17ca01 100644 --- a/src/replica/services/DatabaseServicesMySQL.h +++ b/src/replica/services/DatabaseServicesMySQL.h @@ -144,8 +144,9 @@ class DatabaseServicesMySQL : public DatabaseServices { TransactionInfo transaction(TransactionId id, bool includeContext = false, bool includeLog = false) final; - std::vector transactions(std::string const& databaseName = std::string(), - bool includeContext = false, bool includeLog = false) final; + std::vector transactions( + std::string const& databaseName = std::string(), bool includeContext = false, + bool includeLog = false, std::set const& stateSelector = {}) final; std::vector transactions(TransactionInfo::State state, bool includeContext = false, bool includeLog = false) final; @@ -163,29 +164,26 @@ class DatabaseServicesMySQL : public DatabaseServices { TransactionInfo updateTransaction(TransactionId id, std::unordered_map const& events) final; - TransactionContribInfo transactionContrib(unsigned int id, bool includeWarnings = false, + TransactionContribInfo transactionContrib(unsigned int id, bool includeExtensions = false, + bool includeWarnings = false, bool includeRetries = false) final; std::vector transactionContribs( - TransactionId transactionId, std::string const& table = std::string(), - std::string const& workerName = std::string(), + TransactionId transactionId, std::string const& table, std::string const& workerName, + std::set const& statusSelector = {}, TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + int chunkSelector = -1, bool includeExtensions = false, bool includeWarnings = false, + bool includeRetries = false, size_t minRetries = 0, size_t minWarnings = 0, + size_t maxEntries = 0) final; std::vector transactionContribs( - TransactionId transactionId, TransactionContribInfo::Status status, - std::string const& table = std::string(), std::string const& workerName = std::string(), + std::string const& database, std::string const& table, std::string const& workerName, + std::set const& statusSelector = {}, TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; - - std::vector transactionContribs( - std::string const& database, std::string const& table = std::string(), - std::string const& workerName = std::string(), - TransactionContribInfo::TypeSelector typeSelector = - TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeExtensions = false, bool includeWarnings = false, bool includeRetries = false, + size_t minRetries = 0, size_t minWarnings = 0, size_t maxEntries = 0) final; TransactionContribInfo createdTransactionContrib( TransactionContribInfo const& info, bool failed = false, @@ -347,18 +345,17 @@ class DatabaseServicesMySQL : public DatabaseServices { std::string const& predicate, bool includeContext, bool includeLog); - std::vector _transactionContribs(replica::Lock const& lock, - std::string const& predicate, - bool includeWarnings = false, - bool includeRetries = false); + std::vector _transactionContribs( + replica::Lock const& lock, std::string const& predicate, bool includeExtensions = false, + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0); TransactionContribInfo _transactionContribImpl(replica::Lock const& lock, std::string const& predicate, + bool includeExtensions = false, bool includeWarnings = false, bool includeRetries = false); - std::vector _transactionContribsImpl(replica::Lock const& lock, - std::string const& predicate, - bool includeWarnings = false, - bool includeRetries = false); + std::vector _transactionContribsImpl( + replica::Lock const& lock, std::string const& predicate, bool includeExtensions = false, + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0); DatabaseIngestParam _ingestParamImpl(replica::Lock const& lock, std::string const& predicate); diff --git a/src/replica/services/DatabaseServicesPool.cc b/src/replica/services/DatabaseServicesPool.cc index 66ed28123..21ca2a118 100644 --- a/src/replica/services/DatabaseServicesPool.cc +++ b/src/replica/services/DatabaseServicesPool.cc @@ -252,9 +252,10 @@ TransactionInfo DatabaseServicesPool::transaction(TransactionId id, bool include } vector DatabaseServicesPool::transactions(string const& databaseName, bool includeContext, - bool includeLog) { + bool includeLog, + set const& stateSelector) { ServiceAllocator service(shared_from_base()); - return service()->transactions(databaseName, includeContext, includeLog); + return service()->transactions(databaseName, includeContext, includeLog, stateSelector); } vector DatabaseServicesPool::transactions(TransactionInfo::State state, bool includeContext, @@ -287,35 +288,32 @@ TransactionInfo DatabaseServicesPool::updateTransaction(TransactionId id, return service()->updateTransaction(id, events); } -TransactionContribInfo DatabaseServicesPool::transactionContrib(unsigned int id, bool includeWarnings, - bool includeRetries) { +TransactionContribInfo DatabaseServicesPool::transactionContrib(unsigned int id, bool includeExtensions, + bool includeWarnings, bool includeRetries) { ServiceAllocator service(shared_from_base()); - return service()->transactionContrib(id, includeWarnings, includeRetries); + return service()->transactionContrib(id, includeExtensions, includeWarnings, includeRetries); } vector DatabaseServicesPool::transactionContribs( TransactionId transactionId, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { - ServiceAllocator service(shared_from_base()); - return service()->transactionContribs(transactionId, table, workerName, typeSelector, includeWarnings, - includeRetries); -} - -vector DatabaseServicesPool::transactionContribs( - TransactionId transactionId, TransactionContribInfo::Status status, string const& table, - string const& workerName, TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, - bool includeRetries) { + set const& statusSelector, + TransactionContribInfo::TypeSelector typeSelector, int chunkSelector, bool includeExtensions, + bool includeWarnings, bool includeRetries, size_t minRetries, size_t minWarnings, size_t maxEntries) { ServiceAllocator service(shared_from_base()); - return service()->transactionContribs(transactionId, status, table, workerName, typeSelector, - includeWarnings, includeRetries); + return service()->transactionContribs(transactionId, table, workerName, statusSelector, typeSelector, + chunkSelector, includeExtensions, includeWarnings, includeRetries, + minRetries, minWarnings, maxEntries); } vector DatabaseServicesPool::transactionContribs( string const& database, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + set const& statusSelector, + TransactionContribInfo::TypeSelector typeSelector, bool includeExtensions, bool includeWarnings, + bool includeRetries, size_t minRetries, size_t minWarnings, size_t maxEntries) { ServiceAllocator service(shared_from_base()); - return service()->transactionContribs(database, table, workerName, typeSelector, includeWarnings, - includeRetries); + return service()->transactionContribs(database, table, workerName, statusSelector, typeSelector, + includeExtensions, includeWarnings, includeRetries, minRetries, + minWarnings, maxEntries); } TransactionContribInfo DatabaseServicesPool::createdTransactionContrib( diff --git a/src/replica/services/DatabaseServicesPool.h b/src/replica/services/DatabaseServicesPool.h index a6fcb4549..edd2f3d27 100644 --- a/src/replica/services/DatabaseServicesPool.h +++ b/src/replica/services/DatabaseServicesPool.h @@ -145,8 +145,9 @@ class DatabaseServicesPool : public DatabaseServices { TransactionInfo transaction(TransactionId id, bool includeContext = false, bool includeLog = false) final; - std::vector transactions(std::string const& databaseName = std::string(), - bool includeContext = false, bool includeLog = false) final; + std::vector transactions( + std::string const& databaseName = std::string(), bool includeContext = false, + bool includeLog = false, std::set const& stateSelector = {}) final; std::vector transactions(TransactionInfo::State state, bool includeContext = false, bool includeLog = false) final; @@ -164,29 +165,26 @@ class DatabaseServicesPool : public DatabaseServices { TransactionInfo updateTransaction(TransactionId id, std::unordered_map const& events) final; - TransactionContribInfo transactionContrib(unsigned int id, bool includeWarnings = false, + TransactionContribInfo transactionContrib(unsigned int id, bool includeExtensions = false, + bool includeWarnings = false, bool includeRetries = false) final; std::vector transactionContribs( - TransactionId transactionId, std::string const& table = std::string(), - std::string const& workerName = std::string(), + TransactionId transactionId, std::string const& table, std::string const& workerName, + std::set const& statusSelector = {}, TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + int chunkSelector = -1, bool includeExtensions = false, bool includeWarnings = false, + bool includeRetries = false, size_t minRetries = 0, size_t minWarnings = 0, + size_t maxEntries = 0) final; std::vector transactionContribs( - TransactionId transactionId, TransactionContribInfo::Status status, - std::string const& table = std::string(), std::string const& workerName = std::string(), + std::string const& database, std::string const& table, std::string const& workerName, + std::set const& statusSelector = {}, TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; - - std::vector transactionContribs( - std::string const& database, std::string const& table = std::string(), - std::string const& workerName = std::string(), - TransactionContribInfo::TypeSelector typeSelector = - TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeExtensions = false, bool includeWarnings = false, bool includeRetries = false, + size_t minRetries = 0, size_t minWarnings = 0, size_t maxEntries = 0) final; TransactionContribInfo createdTransactionContrib( TransactionContribInfo const& info, bool failed = false, diff --git a/src/util/String.cc b/src/util/String.cc index ebd575b3d..5a594af0b 100644 --- a/src/util/String.cc +++ b/src/util/String.cc @@ -80,16 +80,16 @@ char const hexCharsLC[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' namespace lsst::qserv::util { -vector String::split(string const& original, string const& delimiter, bool greedy) { - // Apply trivial optimizations. Note that the specified "greedy" behavior +vector String::split(string const& original, string const& delimiter, bool skipEmpty) { + // Apply trivial optimizations. Note that the specified "skipEmpty" behavior // must be preserved during the optimisations. vector result; if (original.empty()) { - if (!greedy) result.push_back(original); + if (!skipEmpty) result.push_back(original); return result; } if (delimiter.empty()) { - if (!original.empty() || !greedy) result.push_back(original); + if (!original.empty() || !skipEmpty) result.push_back(original); return result; } string str(original); @@ -101,23 +101,23 @@ vector String::split(string const& original, string const& delimiter, bo loop = false; } auto const candidate = str.substr(0, pos); - if (!candidate.empty() || !greedy) result.push_back(candidate); + if (!candidate.empty() || !skipEmpty) result.push_back(candidate); str = str.substr(pos + delimiter.length()); } return result; } vector String::parseToVectInt(string const& str, string const& delimiter, bool throwOnError, - int defaultVal, bool greedy) { + int defaultVal, bool skipEmpty) { auto const parseNumber = [](string const& str, size_t& sz) -> int { return stoi(str, &sz); }; - return ::getNumericVectFromStr(__func__, split(str, delimiter, greedy), parseNumber, throwOnError, + return ::getNumericVectFromStr(__func__, split(str, delimiter, skipEmpty), parseNumber, throwOnError, defaultVal); } vector String::parseToVectUInt64(string const& str, string const& delimiter, bool throwOnError, - uint64_t defaultVal, bool greedy) { + uint64_t defaultVal, bool skipEmpty) { auto const parseNumber = [](string const& str, size_t& sz) -> uint64_t { return stoull(str, &sz); }; - return ::getNumericVectFromStr(__func__, split(str, delimiter, greedy), parseNumber, + return ::getNumericVectFromStr(__func__, split(str, delimiter, skipEmpty), parseNumber, throwOnError, defaultVal); } diff --git a/src/util/String.h b/src/util/String.h index 5e61a80c1..6e695f4fc 100644 --- a/src/util/String.h +++ b/src/util/String.h @@ -38,18 +38,18 @@ class String { * Split the input string into substrings using the specified delimiter. * @param str The input string to be parsed. * @param delimiter A delimiter. - * @param greedy The optional flag that if 'true' would eliminate empty strings from + * @param skipEmpty The optional flag that if 'true' would eliminate empty strings from * the result. Otherwise the empty strings found between the delimiter will * be preserved in the result. - * @note The result filtering requested by a value of the parameter 'greedy' + * @note The result filtering requested by a value of the parameter 'skipEmpty' * also applies to a secenario when the input string is empty. In particular, - * in the 'greedy' mode the output collection will be empty. Otherwise, the collection + * in the 'skipEmpty' mode the output collection will be empty. Otherwise, the collection * will have exactly one element - the empty string. * @return A collection of strings resulting from splitting the input string into * sub-strings using the delimiter. The delimiter won't be included into the substrings. */ static std::vector split(std::string const& str, std::string const& delimiter, - bool greedy = false); + bool skipEmpty = false); /** * Parse the input string into a collection of numbers (int). @@ -59,7 +59,7 @@ class String { * from substrings to numbers fail. If the flag is set to 'true' then an exception * will be thrown. Otherwise, the default value will be placed into the output collection. * @param defaultVal The optional default value to be injected where applies. - * @param greedy The optional flag that if 'true' would eliminate empty substrings from + * @param skipEmpty The optional flag that if 'true' would eliminate empty substrings from * parsing into the the numeric result. Otherwise the behavior of the methjod will be driven * by a value of the parameter 'throwOnError'. * @return A collection of numbers found in the input string. @@ -68,7 +68,8 @@ class String { * @see function split */ static std::vector parseToVectInt(std::string const& str, std::string const& delimiter, - bool throwOnError = true, int defaultVal = 0, bool greedy = false); + bool throwOnError = true, int defaultVal = 0, + bool skipEmpty = false); /** * Parse the input string into a collection of numbers (std::uint64_t). @@ -76,7 +77,8 @@ class String { */ static std::vector parseToVectUInt64(std::string const& str, std::string const& delimiter, bool throwOnError = true, - std::uint64_t defaultVal = 0ULL, bool greedy = false); + std::uint64_t defaultVal = 0ULL, + bool skipEmpty = false); /** * Pack an iterable collection into a string. diff --git a/src/util/testString.cc b/src/util/testString.cc index 2d2443ced..038f3a404 100644 --- a/src/util/testString.cc +++ b/src/util/testString.cc @@ -112,8 +112,8 @@ BOOST_AUTO_TEST_CASE(SplitStringTest) { { std::string const emptyStr; std::string const delimiter = " "; - bool const greedy = true; - auto const vect = util::String::split(emptyStr, delimiter, greedy); + bool const skipEmpty = true; + auto const vect = util::String::split(emptyStr, delimiter, skipEmpty); LOGS_ERROR("vect=" << util::String::toString(vect, delimiter, "'", "'")); BOOST_CHECK_EQUAL(vect.size(), 0UL); } @@ -144,8 +144,8 @@ BOOST_AUTO_TEST_CASE(SplitStringTest) { BOOST_CHECK_EQUAL(vect.size(), j); } { - bool const greedy = true; - auto const vect = util::String::split(" a b cd e f ", " ", greedy); + bool const skipEmpty = true; + auto const vect = util::String::split(" a b cd e f ", " ", skipEmpty); LOGS_ERROR("vect=" << util::String::toString(vect, " ", "'", "'")); size_t j = 0; BOOST_CHECK_EQUAL(vect[j++], "a"); @@ -233,19 +233,19 @@ BOOST_AUTO_TEST_CASE(GetVecFromStrTest) { auto const vect = util::String::parseToVectInt(str2, ":", false, defaultVal); LOGS_ERROR("vect=" << util::String::toString(vect, " ", "'", "'")); size_t j = 0; - BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-greedy mode + BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-skipEmpty mode BOOST_CHECK_EQUAL(vect[j++], 987); BOOST_CHECK_EQUAL(vect[j++], 23); BOOST_CHECK_EQUAL(vect[j++], defaultVal); // Couldn't parse "x8owlq" as a number BOOST_CHECK_EQUAL(vect[j++], 1); BOOST_CHECK_EQUAL(vect[j++], -123); - BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-greedy mode + BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-skipEmpty mode BOOST_CHECK_EQUAL(vect.size(), j); } { int const defaultVal = 99; - bool const greedy = true; - auto const vect = util::String::parseToVectInt(str2, ":", false, defaultVal, greedy); + bool const skipEmpty = true; + auto const vect = util::String::parseToVectInt(str2, ":", false, defaultVal, skipEmpty); LOGS_ERROR("vect=" << util::String::toString(vect, " ", "'", "'")); size_t j = 0; BOOST_CHECK_EQUAL(vect[j++], 987); @@ -258,8 +258,8 @@ BOOST_AUTO_TEST_CASE(GetVecFromStrTest) { std::string const str3 = ":123456789123123:23:x8owlq::1:-123:"; { auto const defaultVal = std::numeric_limits::max(); - bool const greedy = true; - auto const vect = util::String::parseToVectUInt64(str3, ":", false, defaultVal, greedy); + bool const skipEmpty = true; + auto const vect = util::String::parseToVectUInt64(str3, ":", false, defaultVal, skipEmpty); LOGS_ERROR("vect=" << util::String::toString(vect, " ", "'", "'")); size_t j = 0; BOOST_CHECK_EQUAL(vect[j++], 123456789123123ULL); @@ -274,14 +274,14 @@ BOOST_AUTO_TEST_CASE(GetVecFromStrTest) { auto const vect = util::String::parseToVectUInt64(str3, ":", false, defaultVal); LOGS_ERROR("vect=" << util::String::toString(vect, " ", "'", "'")); size_t j = 0; - BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-greedy mode + BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-skipEmpty mode BOOST_CHECK_EQUAL(vect[j++], 123456789123123ULL); BOOST_CHECK_EQUAL(vect[j++], 23ULL); BOOST_CHECK_EQUAL(vect[j++], defaultVal); // Couldn't parse "x8owlq" as a number - BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-greedy mode + BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-skipEmpty mode BOOST_CHECK_EQUAL(vect[j++], 1ULL); BOOST_CHECK_EQUAL(vect[j++], -123LL); - BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-greedy mode + BOOST_CHECK_EQUAL(vect[j++], defaultVal); // The empty string in the non-skipEmpty mode BOOST_CHECK_EQUAL(vect.size(), j); } } diff --git a/src/www/qserv/js/Common.js b/src/www/qserv/js/Common.js index 39f739aa8..2d6240833 100644 --- a/src/www/qserv/js/Common.js +++ b/src/www/qserv/js/Common.js @@ -6,7 +6,7 @@ function(sqlFormatter, _) { class Common { - static RestAPIVersion = 36; + static RestAPIVersion = 37; static query2text(query, expanded) { if (expanded) { if (query.length > Common._max_expanded_length) { diff --git a/src/www/qserv/js/IngestContribInfo.js b/src/www/qserv/js/IngestContribInfo.js index c3b19c894..91af23277 100644 --- a/src/www/qserv/js/IngestContribInfo.js +++ b/src/www/qserv/js/IngestContribInfo.js @@ -314,7 +314,7 @@ function(CSSLoader, _load_contrib(contrib_id) { Fwk.web_service_GET( "/ingest/trans/contrib/" + contrib_id, - {include_warnings: 1, include_retries: 1, version: Common.RestAPIVersion}, + {include_extensions: 1, include_warnings: 1, include_retries: 1, version: Common.RestAPIVersion}, (data) => { console.log(data["contribution"]); if (!data.success) { diff --git a/src/www/qserv/js/IngestContributions.js b/src/www/qserv/js/IngestContributions.js index cdd0194eb..3b8f68234 100644 --- a/src/www/qserv/js/IngestContributions.js +++ b/src/www/qserv/js/IngestContributions.js @@ -17,19 +17,18 @@ function(CSSLoader, constructor(name) { super(name); - this._data = undefined; + this._files = []; + this._max_num_workers = 0; + this._max_num_tables = 0; + this._max_num_trans = 0; } - /// @see FwkApplication.fwk_app_on_show fwk_app_on_show() { this.fwk_app_on_update(); } - /// @see FwkApplication.fwk_app_on_hide - fwk_app_on_hide() { - } + fwk_app_on_hide() {} - /// @see FwkApplication.fwk_app_on_update fwk_app_on_update() { if (this.fwk_app_visible) { this._init(); @@ -44,14 +43,10 @@ function(CSSLoader, } } - /// Set parameters of the transaction in the selectors and begin loading - /// the contributions in the background. - set_transaction(status, databases, database, transactions, trans_id) { + /// Load and display contributions in the specified context. + search(worker = undefined, database = undefined, table = undefined, transaction = undefined) { this._init(); - this._set_database_status(status); - this._set_databases(databases, database); - this._set_transactions(transactions, trans_id); - this._load(); + this._load(worker, database, table, transaction); } /** @@ -67,50 +62,59 @@ function(CSSLoader,
- - + +
-
+
-
- - -
-
- ${Common.html_update_ival('update-interval', 60)} +
+ +
-
-
- - + +
-
- - +
+ +
- - + + + + +
+
+
- +
@@ -118,46 +122,53 @@ function(CSSLoader,
- - + +
-
- - +
+
+ +
+
+ ${Common.html_update_ival('update-interval', 60)} +
- +
-
-
+
+
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
@@ -199,6 +228,7 @@ function(CSSLoader, + Timing @@ -220,6 +250,7 @@ function(CSSLoader, Id Worker + Trans Table Chunk Overlap @@ -251,21 +282,29 @@ function(CSSLoader,
`; let cont = this.fwk_app_container.html(html); - cont.find("#update-interval").change(() => { + cont.find("#update-interval").on("change", () => { this._load(); }); - cont.find(".loader").change(() => { + cont.find(".loader").on("change", (ev) => { + if (ev.target.id === "contrib-status") { + if (this._get_status() === "IN_PROGRESS") { + this._disable_stage(false); + } else { + this._disable_stage(true); + this._set_stage(""); + } + } this._load(); }); - cont.find(".filter").change(() => { - if (!_.isUndefined(this._data)) this._display(this._data); + cont.find(".filter").on("change", () => { + this._display(); }); - cont.find(".sorter").change(() => { - if (!_.isUndefined(this._data)) this._display(this._data); + cont.find(".sorter").on("change", () => { + this._display(); }); cont.find("button#contrib-reset").click(() => { - this._reset_contrib_filter(); - if (!_.isUndefined(this._data)) this._display(this._data); + this._reset(); + this._load(); }); } _table() { @@ -287,114 +326,175 @@ function(CSSLoader, } return this._form_control_obj[id]; } - _set_num_select(val, total) { this._form_control('input', 'contrib-num-select').val(val + ' / ' + total); } - _get_database_status() { return this._form_control('select', 'contrib-database-status').val(); } - _set_database_status(val) { this._form_control('select', 'contrib-database-status').val(val); } _get_database() { return this._form_control('select', 'contrib-database').val(); } - _set_databases(databases, database=undefined) { + _set_database(val) { this._form_control('select', 'contrib-database').val(val); } + _set_databases(databases, database=undefined, table=undefined) { + databases = _.sortBy(databases, function (info) { return info.database; }); // Keep the current selection after updating the selector in case if the // database belongs to this collection. const current_database = _.isUndefined(database) ? this._get_database() : database; let in_collection = false; this._form_control('select', 'contrib-database').html( - _.reduce(databases, (html, name) => { - if (name === current_database) in_collection = true; + _.reduce(databases, (html, info) => { + if (info.database === current_database) in_collection = true; const selected = !html ? 'selected' : ''; - return html + ``; + return html + ``; }, '') ); - if (in_collection) { - this._form_control('select', 'contrib-database').val(current_database); - } + if (in_collection) this._set_database(current_database); + // Update the tables selector. + const current_database_info = _.find(databases, (info) => { return info.database === this._get_database(); }); + const tables = current_database_info ? current_database_info.tables : []; + this._max_num_tables = _.size(tables); + this._set_tables(tables, table); } - _get_trans_id() { return this._form_control('select', 'contrib-trans-id').val(); } - _set_transactions(transactions, trans_id=undefined) { + + _get_transaction() { return this._form_control('select', 'contrib-transaction').val(); } + _set_transaction(val) { this._form_control('select', 'contrib-transaction').val(val); } + _set_transactions(transactions, transaction=undefined) { // Keep the current selection after updating the selector in case if the // transaction belongs to this collection. - const current_id = parseInt(_.isUndefined(trans_id) ? this._get_trans_id() : trans_id); + const current_id = parseInt(_.isUndefined(transaction) ? this._get_transaction() : transaction); let in_collection = false; - this._form_control('select', 'contrib-trans-id').html( - _.reduce(transactions, (html, id) => { + this._form_control('select', 'contrib-transaction').html( + `` + + _.reduce(transactions, (html, id) => { if (id === current_id) in_collection = true; - const selected = !html ? 'selected' : ''; - return html + ``; + return html + ``; }, '') ); - if (in_collection) { - this._form_control('select', 'contrib-trans-id').val(current_id); - } - } - _reset_contrib_filter() { - this._form_control('select', 'contrib-worker').val(''); - this._form_control('select', 'contrib-table').val(''); - this._form_control('input', 'contrib-chunk').val(''); - this._form_control('select', 'contrib-overlap').val(''); - this._form_control('select', 'contrib-async').val(''); - this._form_control('select', 'contrib-status').val(''); - this._form_control('select', 'contrib-stage').val(''); - } - _disable_controls(disable) { - this.fwk_app_container.find(".form-control").prop('disabled', disable); + if (in_collection) this._set_transaction(current_id); } + + _set_num_workers(val) { this._form_control('input', 'contrib-num-workers').val(val + ' / ' + this._max_num_workers); } + _set_num_tables(val) { this._form_control('input', 'contrib-num-tables').val(val + ' / ' + this._max_num_tables); } + _set_num_trans(val) { this._form_control('input', 'contrib-num-trans').val(val + ' / ' + this._max_num_trans); } + _set_num_select(val) { this._form_control('input', 'contrib-num-select').val(val + ' / ' + this._get_max_entries()); } + + _get_min_retries() { return this._form_control('input', 'contrib-min-retries').val(); } + _set_min_retries(val) { this._form_control('input', 'contrib-min-retries').val(val); } + + _get_min_warnings() { return this._form_control('input', 'contrib-min-warnings').val(); } + _set_min_warnings(val) { this._form_control('input', 'contrib-min-warnings').val(val); } + + _get_max_entries() { return this._form_control('select', 'max-entries').val(); } + _set_max_entries(val) { this._form_control('select', 'max-entries').val(val); } + _get_worker() { return this._form_control('select', 'contrib-worker').val(); } - _set_workers(workers, val) { - let html = ``; - for (let worker in workers) { - html += ``; - } - this._form_control('select', 'contrib-worker').html(html).val(val); + _set_worker(val) { this._form_control('select', 'contrib-worker').val(val); } + _set_workers(workers, worker=undefined) { + // Keep the current selection after updating the selector in case if the + // worker belongs to this collection. + const current_worker = _.isUndefined(worker) ? this._get_worker() : worker; + let in_collection = false; + this._form_control('select', 'contrib-worker').html( + `` + + _.reduce(workers, (html, info) => { + if (info.name === current_worker) in_collection = true; + return html + ``; + }, '') + ); + if (in_collection) this._set_worker(current_worker); } + _get_table() { return this._form_control('select', 'contrib-table').val(); } - _set_tables(tables, val) { - let html = ``; - for (let table in tables) { - html += ``; - } - this._form_control('select', 'contrib-table').html(html).val(val); + _set_table(val) { this._form_control('select', 'contrib-table').val(val); } + _set_tables(tables, table=undefined) { + // Keep the current selection after updating the selector in case if the + // table belongs to this collection. + const current_table = _.isUndefined(table) ? this._get_table() : table; + let in_collection = false; + this._form_control('select', 'contrib-table').html( + `` + + _.reduce(tables, (html, info) => { + if (info.name === current_table) in_collection = true; + return html + ``; + }, '') + ); + if (in_collection) this._set_table(current_table); + } + + _get_chunk() { + const val = this._form_control('input', 'contrib-chunk').val(); + return _.isEmpty(val) ? '-1' : val; } - _get_chunk() { return this._form_control('input', 'contrib-chunk').val(); } + _set_chunk(val) { this._form_control('input', 'contrib-chunk').val(val); } + _get_overlap() { return this._form_control('select', 'contrib-overlap').val(); } + _set_overlap(val) { this._form_control('select', 'contrib-overlap').val(val); } + _get_async() { return this._form_control('select', 'contrib-async').val(); } + _set_async(val) { this._form_control('select', 'contrib-async').val(val); } + _get_status() { return this._form_control('select', 'contrib-status').val(); } + _set_status(val) { this._form_control('select', 'contrib-status').val(val); } + _get_stage() { return this._form_control('select', 'contrib-stage').val(); } + _set_stage(val) { this._form_control('select', 'contrib-stage').val(val); } + _disable_stage(yes) { this._form_control('select', 'contrib-stage').prop('disabled', yes); } + _get_sort_by_column() { return this._form_control('select', 'contrib-sort-column').val(); } _get_sort_order() { return this._form_control('select', 'contrib-sort-order').val(); } _update_interval_sec() { return this._form_control('select', 'update-interval').val(); } - _load() { + _reset() { + this._set_worker(''); + this._set_table(''); + this._set_transaction(''); + this._set_chunk(''); + this._set_overlap(''); + this._set_async(''); + this._set_status('IN_PROGRESS'); + this._set_stage('!QUEUED'); + this._disable_stage(false); + this._set_min_retries(0); + this._set_min_warnings(0); + this._set_max_entries(200); + } + _disable_controls(disable) { + this.fwk_app_container.find(".form-control.loader").prop('disabled', disable); + } + + _load(worker, database, table, transaction) { if (this._loading === undefined) this._loading = false; if (this._loading) return; this._loading = true; this._status().addClass('updating'); this._disable_controls(true); - this._load_databases(this._get_database_status()); + this._load_workers(worker, database, table, transaction); } - _load_databases(status) { + _load_workers(worker, database, table, transaction) { Fwk.web_service_GET( "/replication/config", {version: Common.RestAPIVersion}, (data) => { if (!data.success) { this._on_failure(data.error); - return; + } else { + this._max_num_workers = _.size(data.config.workers); + this._set_workers(data.config.workers, worker); + this._load_databases(database, table, transaction); + } + }, + (msg) => { this._on_failure(msg); } + ); + } + _load_databases(database, table, transaction) { + Fwk.web_service_GET( + "/replication/config", + {version: Common.RestAPIVersion}, + (data) => { + if (!data.success) { + this._on_failure(data.error); + } else { + this._set_databases(data.config.databases, database, table); + this._load_transactions(transaction); } - this._set_databases(_.map( - _.filter( - data.config.databases, - function (info) { - return (status === "") || - ((status === "PUBLISHED") && info.is_published) || - ((status === "INGESTING") && !info.is_published); - } - ), - function (info) { return info.database; } - )); - this._load_transactions(); }, (msg) => { this._on_failure(msg); } ); } - _load_transactions() { + _load_transactions(transaction) { const current_database = this._get_database(); if (!current_database) { this._on_failure("No databases found in this status category"); @@ -402,86 +502,45 @@ function(CSSLoader, } Fwk.web_service_GET( "/ingest/trans", - {database: current_database, contrib: 1, contrib_long: 0, version: Common.RestAPIVersion}, + {database: current_database, contrib: 0, contrib_long: 0, version: Common.RestAPIVersion}, (data) => { if (!data.success) { this._on_failure(data.error); return; } // Transactions are shown sorted in the ASC order - this._set_transactions(_.map( - _.sortBy( - data.databases[current_database].transactions, + this._set_transactions( + _.map( + _.sortBy(data.databases[current_database].transactions, function (info) { return info.id; }), function (info) { return info.id; } ), - function (info) { return info.id; } - )); + transaction + ); this._load_contribs(); - }, (msg) => { this._on_failure(msg); } ); } _load_contribs() { - const current_id = this._get_trans_id(); - if (!current_id) { - this._on_failure("No transactions exist for selected database"); - return; - } Fwk.web_service_GET( - "/ingest/trans/" + current_id, - {contrib: 1, contrib_long: 1, version: Common.RestAPIVersion}, + "/ingest/trans/" + this._get_transaction(), + { contrib: 1, + database: this._get_database(), + table: this._get_table(), + worker: this._get_worker(), + chunk: this._get_chunk(), + contrib_status: this._get_status(), + contrib_long: 1, + min_retries: this._get_min_retries(), + min_warnings: this._get_min_warnings(), + max_entries: this._get_max_entries(), + version: Common.RestAPIVersion}, (data) => { if (!data.success) { this._status().html('No such transaction'); this._table().children('tbody').html(''); } else { - const MiB = 1024 * 1024; - // There should be just one database in the collection. - for (let database in data.databases) { - this._data = data.databases[database].transactions[0]; - // The sort order needs to be reset to allow pre-sorting the new data the first - // time it will get displayed. - this._prev_sort_by_column = undefined; - this._prev_sort_order = undefined; - const workers = {}; - const tables = {}; - for (let i in this._data.contrib.files) { - // INMPORTANT: using 'var' instead of 'let' to allow modifying the content - // of the contributions in the original collection. Otherwise mods would - // ba made to a local copy of the contribution descriptor which has the life - // expectancy not exceeding the body the body of the loop. - var file = this._data.contrib.files[i]; - // Compute the 'stage' attribute of the IN_PROGRESS contribution requests - // based on the timestamps. - if (file.status === 'IN_PROGRESS') { - if (!file.start_time) file.stage = '1:QUEUED'; - else if (!file.read_time) file.stage = '2:READING_DATA'; - else if (!file.load_time) file.stage = '3:LOADING_MYSQL'; - } else { - file.stage = ''; - } - // Compute intervals (put the large numbers for the missing timestamps) - file.create2start = file.create_time && file.start_time ? file.start_time - file.create_time : file.create_time; - file.start2read = file.start_time && file.read_time ? file.read_time - file.start_time : file.create_time; - file.read2load = file.read_time && file.load_time ? file.load_time - file.read_time : file.create_time; - // Compute the I/O performance counters - file.io_read = 0; - file.io_load = 0; - if (file.status === 'FINISHED') { - let readSec = (file.read_time - file.start_time) / 1000.; - let loadSec = (file.load_time - file.read_time) / 1000.; - file.io_read = readSec > 0 ? (file.num_bytes / MiB) / readSec : 0; - file.io_load = loadSec > 0 ? (file.num_bytes / MiB) / loadSec : 0; - } - workers[file.worker] = 1; - tables[file.table] = 1; - } - this._set_workers(workers, this._get_worker()); - this._set_tables(tables, this._get_table()); - this._display(this._data); - break; - } + this._on_contrib_loaded(data); Fwk.setLastUpdate(this._status()); } this._status().removeClass('updating'); @@ -498,62 +557,105 @@ function(CSSLoader, this._disable_controls(false); this._loading = false; } - /** - * Render the data received from a server - * @param {Object} info transaction descriptor - */ - _display(info) { + _on_contrib_loaded(data) { + // Preprocess and cache the data in the form suitable for the display. + // + // IMPORTANT: using 'var' instead of 'let' to allow modifying the content + // of the contributions in the original collection. Otherwise modifications would + // be made to a local copy of the contribution descriptor which has the life + // expectancy not exceeding the body the body of the enclosing block. + const MiB = 1024 * 1024; + this._files = []; + + // The unique scopes for contribution. + let unique_workers = {}; + let unique_tables = {}; + let unique_trans = {}; + + // There should be just one database in the collection. + var database_data = data.databases[this._get_database()]; + this._max_num_trans = _.size(database_data.transactions); + + for (let i in database_data.transactions) { + var contrib = database_data.transactions[i].contrib; + const trans_id = database_data.transactions[i].id; + // The sort order needs to be reset to allow pre-sorting the new data the first + // time it will get displayed. + this._prev_sort_by_column = undefined; + this._prev_sort_order = undefined; + + for (let i in contrib.files) { + var file = contrib.files[i]; + file.trans_id = trans_id; + // Count contriutions scopes only if they are not already in the collection. + unique_workers[file.worker] = true; + unique_tables[file.table] = true; + unique_trans[trans_id] = true; + // Compute the 'stage' attribute of the IN_PROGRESS contribution requests + // based on the timestamps. + if (file.status === 'IN_PROGRESS') { + if (!file.start_time) file.stage = 'QUEUED'; + else if (!file.read_time) file.stage = 'READING_DATA'; + else if (!file.load_time) file.stage = 'LOADING_MYSQL'; + } else { + file.stage = ''; + } + // Compute intervals (put the large numbers for the missing timestamps) + file.create2start = file.create_time && file.start_time ? file.start_time - file.create_time : file.create_time; + file.start2read = file.start_time && file.read_time ? file.read_time - file.start_time : file.create_time; + file.read2load = file.read_time && file.load_time ? file.load_time - file.read_time : file.create_time; + // Compute the I/O performance counters + file.io_read = 0; + file.io_load = 0; + if (file.status === 'FINISHED') { + let readSec = (file.read_time - file.start_time) / 1000.; + let loadSec = (file.load_time - file.read_time) / 1000.; + file.io_read = readSec > 0 ? (file.num_bytes / MiB) / readSec : 0; + file.io_load = loadSec > 0 ? (file.num_bytes / MiB) / loadSec : 0; + } + } + this._files = this._files.concat(contrib.files); + } + this._set_num_workers(_.size(unique_workers)); + this._set_num_tables(_.size(unique_tables)); + this._set_num_trans(_.size(unique_trans)); + this._set_num_select(_.size(this._files)); + this._display(); + } + _display() { // Sort if the first time displaying the data, or if the sort order has changed // since the previous run of the display. const sort_by_column = this._get_sort_by_column(); const sort_order = this._get_sort_order(); if (_.isUndefined(this._prev_sort_by_column) || (this._prev_sort_by_column !== sort_by_column) || _.isUndefined(this._prev_sort_order) || (this._prev_sort_order !== sort_order)) { - info.contrib.files = _.sortBy(info.contrib.files, sort_by_column); - if (sort_order === 'DESC') info.contrib.files.reverse(); + this._files = _.sortBy(this._files, sort_by_column); + if (sort_order === 'DESC') this._files.reverse(); this._prev_sort_by_column = sort_by_column; this._prev_sort_order = sort_order; } const database = this._get_database(); - const worker = this._get_worker(); - const workerIsSet = worker !== ''; - - const table = this._get_table(); - const tableIsSet = table !== ''; - - const chunkIsSet = this._get_chunk() !== ''; - const chunk = chunkIsSet ? parseInt(this._get_chunk()) : ''; const overlapIsSet = this._get_overlap() !== ''; const overlap = overlapIsSet ? parseInt(this._get_overlap()) : ''; const asyncIsSet = this._get_async() !== ''; const async = asyncIsSet ? parseInt(this._get_async()) : ''; - - const status = this._get_status(); - const statusNotFinishedIsSet = status == '!FINISHED'; - const statusIsSet = status !== ''; - + const stage = this._get_stage(); const stageIsSet = stage !== ''; - let numSelect = 0; let html = ''; - for (let idx in info.contrib.files) { - var file = info.contrib.files[idx]; + for (let idx in this._files) { + var file = this._files[idx]; // Apply optional content filters - if (workerIsSet && file.worker !== worker) continue; - if (tableIsSet && file.table !== table) continue; - if (chunkIsSet && file.chunk !== chunk) continue; if (overlapIsSet && file.overlap !== overlap) continue; if (asyncIsSet && file.async !== async) continue; - if (statusIsSet) { - if (statusNotFinishedIsSet) { - if (file.status === 'FINISHED') continue; - } else if (file.status !== status) continue; + if (stageIsSet && (file.status === 'IN_PROGRESS')) { + if (stage === '!QUEUED') { + if ((file.stage !== 'READING_DATA') && (file.stage !== 'LOADING_MYSQL')) continue; + } else if (file.stage !== stage) continue; } - if (stageIsSet && (file.status === 'IN_PROGRESS') && (file.stage !== stage)) continue; - numSelect++; const overlapStr = file.overlap ? 1 : 0; const asyncStr = file.async ? 'ASYNC' : 'SYNC'; let statusCssClass = ''; @@ -577,6 +679,7 @@ function(CSSLoader,
${file.id}
${file.worker}
+
${file.trans_id}
${file.table}
${file.chunk}
${overlapStr}
@@ -616,7 +719,6 @@ function(CSSLoader, Fwk.find("Ingest", "Contribution Info").set_contrib_id(id); Fwk.show("Ingest", "Contribution Info"); }); - this._set_num_select(numSelect, info.contrib.files.length); } } return IngestContributions; diff --git a/src/www/qserv/js/IngestStatus.js b/src/www/qserv/js/IngestStatus.js index 1ccfe6585..911ea7ecc 100644 --- a/src/www/qserv/js/IngestStatus.js +++ b/src/www/qserv/js/IngestStatus.js @@ -205,6 +205,8 @@ function(CSSLoader, */ _display(databaseInfo) { + const contribInspectTitle = 'Click to see contributions made in a scope of the relevant transactions'; + let html = ''; const database = this._get_database(); @@ -445,7 +447,8 @@ function(CSSLoader, - + + @@ -456,7 +459,8 @@ function(CSSLoader, - + + `; @@ -466,6 +470,9 @@ function(CSSLoader, let attentionCssClass4warnings = tableStats[table].num_warnings === 0 ? '' : 'table-danger'; html += ` + @@ -479,7 +486,8 @@ function(CSSLoader, - + + `; @@ -489,6 +497,9 @@ function(CSSLoader, let attentionCssClass4warnings = workerStats[worker].num_warnings === 0 ? '' : 'table-danger'; html += ` + @@ -505,14 +516,29 @@ function(CSSLoader, `; - this._database().html(html).find("pre.database_table").click((e) => { + var tbody = this._database().html(html); + tbody.find("pre.database_table").click((e) => { const elem = $(e.currentTarget); const database = elem.attr("database"); const table = elem.attr("table"); Fwk.show("Replication", "Schema"); Fwk.current().loadSchema(database, table); }); - } + tbody.find("button.contrib_table").click((e) => { + const worker = undefined; + const table = $(e.currentTarget).attr("id"); + const transactionId = undefined; + Fwk.find("Ingest", "Contributions").search(worker, this._get_database(), table, transactionId); + Fwk.show("Ingest", "Contributions"); + }); + tbody.find("button.contrib_worker").click((e) => { + const worker = $(e.currentTarget).attr("id"); + const table = undefined; + const transactionId = undefined; + Fwk.find("Ingest", "Contributions").search(worker, this._get_database(), table, transactionId); + Fwk.show("Ingest", "Contributions"); + }); + } static timeAgo(timestamp) { let ivalSec = Fwk.now().sec - Math.floor(timestamp / 1000); diff --git a/src/www/qserv/js/IngestTransactions.js b/src/www/qserv/js/IngestTransactions.js index a407f38c2..fa78c2300 100644 --- a/src/www/qserv/js/IngestTransactions.js +++ b/src/www/qserv/js/IngestTransactions.js @@ -67,6 +67,23 @@ function(CSSLoader, +
+ + +
${Common.html_update_ival('update-interval')}
@@ -76,7 +93,8 @@ function(CSSLoader,
 Context Data [GB] Rows Rows loaded
Table Tables:
+ +
${table}
${tableStats[table].data.toFixed(2)}
${tableStats[table].num_rows}
Worker Workers:
+ +
${worker}
${workerStats[worker].data.toFixed(2)}
${workerStats[worker].num_rows}
- + + @@ -145,6 +163,8 @@ function(CSSLoader, ); if (in_collection && current_database) this._set_database(current_database); } + _get_state() { return this._form_control('select', 'trans-state').val(); } + _set_state(val) { this._form_control('select', 'trans-state').val(val); } _disable_selectors(disable) { this.fwk_app_container.find(".form-control-view").prop('disabled', disable); } @@ -203,7 +223,12 @@ function(CSSLoader, } Fwk.web_service_GET( "/ingest/trans", - {database: current_database, contrib: 1, contrib_long: 0, version: Common.RestAPIVersion}, + { database: current_database, + trans_state: this._get_state(), + contrib: 1, + contrib_long: 0, + version: Common.RestAPIVersion + }, (data) => { if (!data.success) { this._on_failure(data.error); @@ -226,6 +251,8 @@ function(CSSLoader, this._loading = false; } _display(databaseInfo) { + const transInspectTitle = 'Click to inspect transaction events log'; + const contribInspectTitle = 'Click to see contributions made in a scope of the transaction'; let html = ''; if (databaseInfo.transactions.length === 0) { html += ` @@ -284,9 +311,11 @@ function(CSSLoader, let attentionCssClass4warnings = numWarnings === 0 ? '' : 'table-danger'; return html + ` - + @@ -323,8 +352,10 @@ function(CSSLoader, ); tbody.find("button.contrib").click( (e) => { + const worker = undefined; + const table = undefined; const transactionId = $(e.currentTarget).attr("id"); - Fwk.find("Ingest", "Contributions").set_transaction(this._get_database_status(), this._databases, this._get_database(), this._transactions, transactionId); + Fwk.find("Ingest", "Contributions").search(worker, this._get_database(), table, transactionId); Fwk.show("Ingest", "Contributions"); } );
More... Id State Timing
- - + + + +
${info.id}
${info.state}