diff --git a/admin/local/docker/compose/docker-compose.yml b/admin/local/docker/compose/docker-compose.yml index dc5ca4880..5a3c37f99 100644 --- a/admin/local/docker/compose/docker-compose.yml +++ b/admin/local/docker/compose/docker-compose.yml @@ -415,7 +415,6 @@ services: --registry-host=repl-mgr-registry --controller-auto-register-workers=1 --qserv-sync-force - --qserv-chunk-map-update --debug expose: - "25081" diff --git a/src/qmeta/Exceptions.h b/src/qmeta/Exceptions.h index 511e1cd35..deeccf73b 100644 --- a/src/qmeta/Exceptions.h +++ b/src/qmeta/Exceptions.h @@ -109,15 +109,6 @@ class MissingTableError : public QMetaError { virtual std::string typeName() const override { return "MissingTableError"; } }; -/// Exception thrown when the specified metadata table is empty. -class EmptyTableError : public QMetaError { -public: - EmptyTableError(util::Issue::Context const& ctx, std::string const& table) - : QMetaError(ctx, "Query metadata table is empty: " + table) {} - - virtual std::string typeName() const override { return "EmptyTableError"; } -}; - /// Exception thrown when database consistency is violated. class ConsistencyError : public QMetaError { public: diff --git a/src/qmeta/QMeta.h b/src/qmeta/QMeta.h index 973ad73ad..07e6afd91 100644 --- a/src/qmeta/QMeta.h +++ b/src/qmeta/QMeta.h @@ -23,16 +23,12 @@ #define LSST_QSERV_QMETA_QMETA_H // System headers -#include #include #include #include #include #include -// Third party headers -#include "nlohmann/json.hpp" - // Qserv headers #include "global/intTypes.h" #include "qmeta/QInfo.h" @@ -50,6 +46,7 @@ namespace lsst::qserv::qmeta { /** * @ingroup qmeta + * * @brief Interface for query metadata. */ @@ -61,36 +58,6 @@ class QMeta { */ typedef std::vector > TableNames; - /** - * The structure ChunkMap encapsulates a disposition of chunks at Qserv workers - * along with a time when the map was updated. - * - * The schema of the JSON object is presented below: - * @code - * {:{ - * :{ - * :[ - * [,], - * ... - * ] - * } - * } - * @code - * Where: - * - the unique identifier of a worker - * - the name of a database - *
- the name of a table - * - the chunk number - * - the number of bytes in the chunk table - */ - struct ChunkMap { - /// The chunk disposition map. - nlohmann::json chunks; - - /// The last time the map was updated (since UNIX Epoch). - std::chrono::time_point updateTime; - }; - /** * Create QMeta instance from configuration dictionary. * @@ -320,11 +287,6 @@ class QMeta { /// Write messages/errors generated during the query to the QMessages table. virtual void addQueryMessages(QueryId queryId, std::shared_ptr const& msgStore) = 0; - /// @return Return the most current chunk disposition - /// @throws EmptyTableError if the corresponding metadata table doesn't have any record - /// @throws SqlError for any other error related to MySQL - virtual ChunkMap getChunkMap() = 0; - protected: // Default constructor QMeta() {} diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index 8b6057ec3..d669b05f8 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -25,7 +25,6 @@ // System headers #include -#include // Third-party headers #include "boost/lexical_cast.hpp" @@ -48,7 +47,7 @@ using namespace std; namespace { // Current version of QMeta schema -char const VERSION_STR[] = "10"; +char const VERSION_STR[] = "9"; LOG_LOGGER _log = LOG_GET("lsst.qserv.qmeta.QMetaMysql"); @@ -841,46 +840,6 @@ void QMetaMysql::addQueryMessages(QueryId queryId, shared_ptr sync(_dbMutex); - - auto trans = QMetaTransaction::create(*_conn); - - sql::SqlErrorObject errObj; - sql::SqlResults results; - string const tableName = "chunkMap"; - string const query = - "SELECT `chunks`,`update_time` FROM `" + tableName + "` ORDER BY `update_time` DESC LIMIT 1"; - LOGS(_log, LOG_LVL_DEBUG, "Executing query: " << query); - if (!_conn->runQuery(query, results, errObj)) { - LOGS(_log, LOG_LVL_ERROR, "query failed: " << query); - throw SqlError(ERR_LOC, errObj); - } - vector chunks; - vector upateTime; - if (!results.extractFirst2Columns(chunks, upateTime, errObj)) { - LOGS(_log, LOG_LVL_ERROR, "Failed to extract result set of query " + query); - throw SqlError(ERR_LOC, errObj); - } - trans->commit(); - - if (chunks.empty()) { - throw EmptyTableError(ERR_LOC, tableName); - } else if (chunks.size() > 1) { - throw ConsistencyError(ERR_LOC, "Too many rows in result set of query " + query); - } - QMeta::ChunkMap chunkMap; - try { - chunkMap.chunks = nlohmann::json::parse(chunks[0]); - chunkMap.updateTime = - chrono::time_point() + chrono::seconds(stol(upateTime[0])); - } catch (exception const& ex) { - string const msg = "Failed to parse result set of query " + query + ", ex: " + string(ex.what()); - throw ConsistencyError(ERR_LOC, msg); - } - return chunkMap; -} - void QMetaMysql::_addQueryMessage(QueryId queryId, qdisp::QueryMessage const& qMsg, int& cancelCount, int& completeCount, int& execFailCount, map& msgCountMap) { // Don't add duplicate messages. diff --git a/src/qmeta/QMetaMysql.h b/src/qmeta/QMetaMysql.h index f4756190d..59664c2ac 100644 --- a/src/qmeta/QMetaMysql.h +++ b/src/qmeta/QMetaMysql.h @@ -44,6 +44,7 @@ namespace lsst::qserv::qmeta { /** * @ingroup qmeta + * * @brief Mysql-based implementation of qserv metadata. */ @@ -262,9 +263,6 @@ class QMetaMysql : public QMeta { /// @see QMeta::addQueryMessages() void addQueryMessages(QueryId queryId, std::shared_ptr const& msgStore) override; - /// @see QMeta::getChunkMap - QMeta::ChunkMap getChunkMap() override; - protected: /// Check that all necessary tables exist void _checkDb(); diff --git a/src/qmeta/schema/migrate-9-to-10.sql b/src/qmeta/schema/migrate-9-to-10.sql deleted file mode 100644 index b3cae88de..000000000 --- a/src/qmeta/schema/migrate-9-to-10.sql +++ /dev/null @@ -1,5 +0,0 @@ -CREATE TABLE IF NOT EXISTS `chunkMap` ( - `chunks` BLOB NOT NULL COMMENT 'A collection of chunk replicas at workers in the JSON format', - `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map') -ENGINE = InnoDB -COMMENT = 'Chunk disposition across workers'; diff --git a/src/qmeta/schema/migrate-None-to-10.sql.jinja b/src/qmeta/schema/migrate-None-to-9.sql.jinja similarity index 95% rename from src/qmeta/schema/migrate-None-to-10.sql.jinja rename to src/qmeta/schema/migrate-None-to-9.sql.jinja index 29d244b93..3fd0e7e38 100644 --- a/src/qmeta/schema/migrate-None-to-10.sql.jinja +++ b/src/qmeta/schema/migrate-None-to-9.sql.jinja @@ -207,16 +207,6 @@ CREATE TABLE IF NOT EXISTS `QMessages` ( ENGINE = InnoDB COMMENT = 'Table of messages generated during queries.'; --- ----------------------------------------------------- --- Table `chunkMap` --- ----------------------------------------------------- - -CREATE TABLE IF NOT EXISTS `chunkMap` ( - `chunks` BLOB NOT NULL COMMENT 'A collection of chunk replicas at workers in the JSON format', - `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map') -ENGINE = InnoDB -COMMENT = 'Chunk disposition across workers'; - -- Update version on every schema change. -- Version 0 corresponds to initial QMeta release and it had no -- QMetadata table at all. @@ -229,6 +219,4 @@ COMMENT = 'Chunk disposition across workers'; -- Version 7 added final row count to QInfo. -- Version 8 replaced INT with BIGINT in the byte and row counter columns of QInfo. -- Version 9 removed the full-text index on the query text from QInfo. --- Version 10 added the worker-to-chunk map table chunkMap - -INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '10'); +INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '9'); diff --git a/src/qmeta/testQMeta.cc b/src/qmeta/testQMeta.cc index b7b49054c..3589f9b36 100644 --- a/src/qmeta/testQMeta.cc +++ b/src/qmeta/testQMeta.cc @@ -413,10 +413,4 @@ BOOST_AUTO_TEST_CASE(messWithQueryStats) { BOOST_CHECK(caught); } -BOOST_AUTO_TEST_CASE(getChunkMap) { - // The test assumes that the uderlying tables exists and it's empty. - QMeta::ChunkMap chunkMap; - BOOST_CHECK_THROW(qMeta->getChunkMap(), EmptyTableError); -} - BOOST_AUTO_TEST_SUITE_END() diff --git a/src/replica/apps/MasterControllerHttpApp.cc b/src/replica/apps/MasterControllerHttpApp.cc index 2b00107fc..462371824 100644 --- a/src/replica/apps/MasterControllerHttpApp.cc +++ b/src/replica/apps/MasterControllerHttpApp.cc @@ -147,10 +147,6 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[]) " This affect replicas to be deleted from the workers during the synchronization" " stages.", _forceQservSync); - parser().flag("qserv-chunk-map-update", - "The flag which would result in updating the chunk disposition map" - " in Qserv's QMeta database.", - _qservChunkMapUpdate); parser().flag("purge", "The binary flag which, if provided, enables the 'purge' algorithm in" " the end of each replication cycle that eliminates excess replicas which" @@ -202,7 +198,7 @@ int MasterControllerHttpApp::runImpl() { _replicationTask = ReplicationTask::create( _controller, [self](Task::Ptr const& ptr) { self->_isFailed.fail(); }, _qservSyncTimeoutSec, - _forceQservSync, _qservChunkMapUpdate, _replicationIntervalSec, _purge); + _forceQservSync, _replicationIntervalSec, _purge); _replicationTask->start(); _healthMonitorTask = HealthMonitorTask::create( diff --git a/src/replica/apps/MasterControllerHttpApp.h b/src/replica/apps/MasterControllerHttpApp.h index f5ef4ed02..d3e17f782 100644 --- a/src/replica/apps/MasterControllerHttpApp.h +++ b/src/replica/apps/MasterControllerHttpApp.h @@ -132,7 +132,6 @@ class MasterControllerHttpApp : public Application { bool _purge; bool _forceQservSync; - bool _qservChunkMapUpdate; bool _permanentDelete; /// A connection URL for the MySQL service of the Qserv master database. diff --git a/src/replica/contr/ReplicationTask.cc b/src/replica/contr/ReplicationTask.cc index 8bf801793..c7dbdea2d 100644 --- a/src/replica/contr/ReplicationTask.cc +++ b/src/replica/contr/ReplicationTask.cc @@ -22,37 +22,23 @@ // Class header #include "replica/contr/ReplicationTask.h" -// System headers -#include - -// Third party headers -#include "nlohmann/json.hpp" - // Qserv headers -#include "replica/config/Configuration.h" #include "replica/jobs/FindAllJob.h" #include "replica/jobs/FixUpJob.h" #include "replica/jobs/ReplicateJob.h" #include "replica/jobs/RebalanceJob.h" #include "replica/jobs/PurgeJob.h" -#include "replica/mysql/DatabaseMySQL.h" -#include "replica/services/DatabaseServices.h" -#include "replica/util/ReplicaInfo.h" using namespace std; -using json = nlohmann::json; namespace lsst::qserv::replica { -using namespace database::mysql; - ReplicationTask::Ptr ReplicationTask::create(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, unsigned int qservSyncTimeoutSec, bool forceQservSync, - bool qservChunkMapUpdate, unsigned int replicationIntervalSec, - bool purge) { + unsigned int replicationIntervalSec, bool purge) { return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, forceQservSync, - qservChunkMapUpdate, replicationIntervalSec, purge)); + replicationIntervalSec, purge)); } bool ReplicationTask::onRun() { @@ -65,8 +51,6 @@ bool ReplicationTask::onRun() { launch(priority, saveReplicaInfo, allWorkers); sync(_qservSyncTimeoutSec, _forceQservSync); - if (_qservChunkMapUpdate) _updateChunkMap(); - launch(priority); sync(_qservSyncTimeoutSec, _forceQservSync); @@ -89,75 +73,10 @@ bool ReplicationTask::onRun() { ReplicationTask::ReplicationTask(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, unsigned int qservSyncTimeoutSec, bool forceQservSync, - bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge) + unsigned int replicationIntervalSec, bool purge) : Task(controller, "REPLICATION-THREAD ", onTerminated, replicationIntervalSec), _qservSyncTimeoutSec(qservSyncTimeoutSec), _forceQservSync(forceQservSync), - _qservChunkMapUpdate(qservChunkMapUpdate), _purge(purge) {} -void ReplicationTask::_updateChunkMap() { - // Get the current status of all known chunk replica across all enabled workers, - // published databases and tables. Package this info into the JSON object of - // the following schema: - // - // {:{ - // :{ - //
:[ - // [,], - // ... - // ] - // } - // } - - bool const allDatabases = true; - string const emptyDatabaseFilter; - bool const isPublished = true; - bool const includeFileInfo = true; // need this to access tables sizes - json chunkMap = json::object(); - for (auto const& workerName : serviceProvider()->config()->workers()) { - vector replicas; - serviceProvider()->databaseServices()->findWorkerReplicas(replicas, workerName, emptyDatabaseFilter, - allDatabases, isPublished, includeFileInfo); - chunkMap[workerName] = json::object(); - json& chunkMapWorker = chunkMap[workerName]; - for (auto const& replica : replicas) { - for (auto const& fileInfo : replica.fileInfo()) { - if (fileInfo.isData() && !fileInfo.isOverlap()) { - chunkMapWorker[replica.database()][fileInfo.baseTable()].push_back( - vector({replica.chunk(), fileInfo.size})); - } - } - } - } - - // Open MySQL connection using the RAII-style handler that would automatically - // abort the transaction should any problem occured when loading data into the table. - ConnectionHandler h; - try { - h.conn = Connection::open(Configuration::qservCzarDbParams("qservMeta")); - } catch (exception const& ex) { - error("failed to connect to the czar's database server, ex: " + string(ex.what())); - return; - } - - // Load the map into the table and purge any previous maps (if any) - QueryGenerator const g(h.conn); - string const tableName = "chunkMap"; - auto const insertQuery = g.insert(tableName, chunkMap.dump(), Sql::NOW); - auto const maxUpdateTimeSubQuery = - g.subQuery(g.select(Sql::MAX_(g.id("update_time"))) + g.from(tableName)); - auto const deleteQuery = - g.delete_(tableName) + g.where(g.notInSubQuery("update_time", maxUpdateTimeSubQuery)); - try { - h.conn->executeInOwnTransaction([&insertQuery, &deleteQuery](auto conn) { - conn->execute(insertQuery); - conn->execute(deleteQuery); - }); - } catch (exception const& ex) { - error("failed to update chunk map in the Czar database, ex: " + string(ex.what())); - return; - } -} - } // namespace lsst::qserv::replica diff --git a/src/replica/contr/ReplicationTask.h b/src/replica/contr/ReplicationTask.h index 5bc99c76b..2f8191e58 100644 --- a/src/replica/contr/ReplicationTask.h +++ b/src/replica/contr/ReplicationTask.h @@ -55,7 +55,6 @@ class ReplicationTask : public Task { * @param qservSyncTimeoutSec The maximum number of seconds to be waited before giving * up on the Qserv synchronization requests. * @param forceQservSync Force chunk removal at worker resource collections if 'true'. - * @param qservChunkMapUpdate Update the chunk disposition map in Qserv's QMeta database if 'true'. * @param replicationIntervalSec The number of seconds to wait in the end of each * iteration loop before to begin the new one. * @param purge Purge excess replicas if 'true'. @@ -63,7 +62,7 @@ class ReplicationTask : public Task { */ static Ptr create(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, - unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate, + unsigned int qservSyncTimeoutSec, bool forceQservSync, unsigned int replicationIntervalSec, bool purge); protected: @@ -73,23 +72,15 @@ class ReplicationTask : public Task { private: /// @see ReplicationTask::create() ReplicationTask(Controller::Ptr const& controller, AbnormalTerminationCallbackType const& onTerminated, - unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate, + unsigned int qservSyncTimeoutSec, bool forceQservSync, unsigned int replicationIntervalSec, bool purge); - void _updateChunkMap(); - /// The maximum number of seconds to be waited before giving up /// on the Qserv synchronization requests. unsigned int const _qservSyncTimeoutSec; - /// Force removal at worker resource collections if 'true'. - bool const _forceQservSync; - - /// Update the chunk disposition map in Qserv's QMeta database if 'true'. - bool const _qservChunkMapUpdate; - - /// Purge excess replicas if 'true'. - bool const _purge; + bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'. + bool const _purge; ///< Purge excess replicas if 'true'. }; } // namespace lsst::qserv::replica