Skip to content

Commit

Permalink
Merge branch 'tickets/DM-43602'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Apr 4, 2024
2 parents fd9c7dc + b66d0eb commit 3214c1e
Show file tree
Hide file tree
Showing 12 changed files with 12 additions and 221 deletions.
1 change: 0 additions & 1 deletion admin/local/docker/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,6 @@ services:
--registry-host=repl-mgr-registry
--controller-auto-register-workers=1
--qserv-sync-force
--qserv-chunk-map-update
--debug
expose:
- "25081"
Expand Down
9 changes: 0 additions & 9 deletions src/qmeta/Exceptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,6 @@ class MissingTableError : public QMetaError {
virtual std::string typeName() const override { return "MissingTableError"; }
};

/// Exception thrown when the specified metadata table is empty.
class EmptyTableError : public QMetaError {
public:
EmptyTableError(util::Issue::Context const& ctx, std::string const& table)
: QMetaError(ctx, "Query metadata table is empty: " + table) {}

virtual std::string typeName() const override { return "EmptyTableError"; }
};

/// Exception thrown when database consistency is violated.
class ConsistencyError : public QMetaError {
public:
Expand Down
40 changes: 1 addition & 39 deletions src/qmeta/QMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
#define LSST_QSERV_QMETA_QMETA_H

// System headers
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "global/intTypes.h"
#include "qmeta/QInfo.h"
Expand All @@ -50,6 +46,7 @@ namespace lsst::qserv::qmeta {

/**
* @ingroup qmeta
*
* @brief Interface for query metadata.
*/

Expand All @@ -61,36 +58,6 @@ class QMeta {
*/
typedef std::vector<std::pair<std::string, std::string> > TableNames;

/**
* The structure ChunkMap encapsulates a disposition of chunks at Qserv workers
* along with a time when the map was updated.
*
* The schema of the JSON object is presented below:
* @code
* {<worker>:{
* <database>:{
* <table>:[
* [<chunk>,<size>],
* ...
* ]
* }
* }
* @code
* Where:
* <worker> - the unique identifier of a worker
* <database> - the name of a database
* <table> - the name of a table
* <chunk> - the chunk number
* <size> - the number of bytes in the chunk table
*/
struct ChunkMap {
/// The chunk disposition map.
nlohmann::json chunks;

/// The last time the map was updated (since UNIX Epoch).
std::chrono::time_point<std::chrono::system_clock> updateTime;
};

/**
* Create QMeta instance from configuration dictionary.
*
Expand Down Expand Up @@ -320,11 +287,6 @@ class QMeta {
/// Write messages/errors generated during the query to the QMessages table.
virtual void addQueryMessages(QueryId queryId, std::shared_ptr<qdisp::MessageStore> const& msgStore) = 0;

/// @return Return the most current chunk disposition
/// @throws EmptyTableError if the corresponding metadata table doesn't have any record
/// @throws SqlError for any other error related to MySQL
virtual ChunkMap getChunkMap() = 0;

protected:
// Default constructor
QMeta() {}
Expand Down
43 changes: 1 addition & 42 deletions src/qmeta/QMetaMysql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

// System headers
#include <algorithm>
#include <stdexcept>

// Third-party headers
#include "boost/lexical_cast.hpp"
Expand All @@ -48,7 +47,7 @@ using namespace std;
namespace {

// Current version of QMeta schema
char const VERSION_STR[] = "10";
char const VERSION_STR[] = "9";

LOG_LOGGER _log = LOG_GET("lsst.qserv.qmeta.QMetaMysql");

Expand Down Expand Up @@ -841,46 +840,6 @@ void QMetaMysql::addQueryMessages(QueryId queryId, shared_ptr<qdisp::MessageStor
}
}

QMeta::ChunkMap QMetaMysql::getChunkMap() {
lock_guard<mutex> sync(_dbMutex);

auto trans = QMetaTransaction::create(*_conn);

sql::SqlErrorObject errObj;
sql::SqlResults results;
string const tableName = "chunkMap";
string const query =
"SELECT `chunks`,`update_time` FROM `" + tableName + "` ORDER BY `update_time` DESC LIMIT 1";
LOGS(_log, LOG_LVL_DEBUG, "Executing query: " << query);
if (!_conn->runQuery(query, results, errObj)) {
LOGS(_log, LOG_LVL_ERROR, "query failed: " << query);
throw SqlError(ERR_LOC, errObj);
}
vector<string> chunks;
vector<string> upateTime;
if (!results.extractFirst2Columns(chunks, upateTime, errObj)) {
LOGS(_log, LOG_LVL_ERROR, "Failed to extract result set of query " + query);
throw SqlError(ERR_LOC, errObj);
}
trans->commit();

if (chunks.empty()) {
throw EmptyTableError(ERR_LOC, tableName);
} else if (chunks.size() > 1) {
throw ConsistencyError(ERR_LOC, "Too many rows in result set of query " + query);
}
QMeta::ChunkMap chunkMap;
try {
chunkMap.chunks = nlohmann::json::parse(chunks[0]);
chunkMap.updateTime =
chrono::time_point<chrono::system_clock>() + chrono::seconds(stol(upateTime[0]));
} catch (exception const& ex) {
string const msg = "Failed to parse result set of query " + query + ", ex: " + string(ex.what());
throw ConsistencyError(ERR_LOC, msg);
}
return chunkMap;
}

void QMetaMysql::_addQueryMessage(QueryId queryId, qdisp::QueryMessage const& qMsg, int& cancelCount,
int& completeCount, int& execFailCount, map<string, ManyMsg>& msgCountMap) {
// Don't add duplicate messages.
Expand Down
4 changes: 1 addition & 3 deletions src/qmeta/QMetaMysql.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace lsst::qserv::qmeta {

/**
* @ingroup qmeta
*
* @brief Mysql-based implementation of qserv metadata.
*/

Expand Down Expand Up @@ -262,9 +263,6 @@ class QMetaMysql : public QMeta {
/// @see QMeta::addQueryMessages()
void addQueryMessages(QueryId queryId, std::shared_ptr<qdisp::MessageStore> const& msgStore) override;

/// @see QMeta::getChunkMap
QMeta::ChunkMap getChunkMap() override;

protected:
/// Check that all necessary tables exist
void _checkDb();
Expand Down
5 changes: 0 additions & 5 deletions src/qmeta/schema/migrate-9-to-10.sql

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,6 @@ CREATE TABLE IF NOT EXISTS `QMessages` (
ENGINE = InnoDB
COMMENT = 'Table of messages generated during queries.';

-- -----------------------------------------------------
-- Table `chunkMap`
-- -----------------------------------------------------

CREATE TABLE IF NOT EXISTS `chunkMap` (
`chunks` BLOB NOT NULL COMMENT 'A collection of chunk replicas at workers in the JSON format',
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map')
ENGINE = InnoDB
COMMENT = 'Chunk disposition across workers';

-- Update version on every schema change.
-- Version 0 corresponds to initial QMeta release and it had no
-- QMetadata table at all.
Expand All @@ -229,6 +219,4 @@ COMMENT = 'Chunk disposition across workers';
-- Version 7 added final row count to QInfo.
-- Version 8 replaced INT with BIGINT in the byte and row counter columns of QInfo.
-- Version 9 removed the full-text index on the query text from QInfo.
-- Version 10 added the worker-to-chunk map table chunkMap

INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '10');
INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '9');
6 changes: 0 additions & 6 deletions src/qmeta/testQMeta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,4 @@ BOOST_AUTO_TEST_CASE(messWithQueryStats) {
BOOST_CHECK(caught);
}

BOOST_AUTO_TEST_CASE(getChunkMap) {
// The test assumes that the uderlying tables exists and it's empty.
QMeta::ChunkMap chunkMap;
BOOST_CHECK_THROW(qMeta->getChunkMap(), EmptyTableError);
}

BOOST_AUTO_TEST_SUITE_END()
6 changes: 1 addition & 5 deletions src/replica/apps/MasterControllerHttpApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[])
" This affect replicas to be deleted from the workers during the synchronization"
" stages.",
_forceQservSync);
parser().flag("qserv-chunk-map-update",
"The flag which would result in updating the chunk disposition map"
" in Qserv's QMeta database.",
_qservChunkMapUpdate);
parser().flag("purge",
"The binary flag which, if provided, enables the 'purge' algorithm in"
" the end of each replication cycle that eliminates excess replicas which"
Expand Down Expand Up @@ -202,7 +198,7 @@ int MasterControllerHttpApp::runImpl() {

_replicationTask = ReplicationTask::create(
_controller, [self](Task::Ptr const& ptr) { self->_isFailed.fail(); }, _qservSyncTimeoutSec,
_forceQservSync, _qservChunkMapUpdate, _replicationIntervalSec, _purge);
_forceQservSync, _replicationIntervalSec, _purge);
_replicationTask->start();

_healthMonitorTask = HealthMonitorTask::create(
Expand Down
1 change: 0 additions & 1 deletion src/replica/apps/MasterControllerHttpApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class MasterControllerHttpApp : public Application {

bool _purge;
bool _forceQservSync;
bool _qservChunkMapUpdate;
bool _permanentDelete;

/// A connection URL for the MySQL service of the Qserv master database.
Expand Down
87 changes: 3 additions & 84 deletions src/replica/contr/ReplicationTask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,23 @@
// Class header
#include "replica/contr/ReplicationTask.h"

// System headers
#include <vector>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "replica/config/Configuration.h"
#include "replica/jobs/FindAllJob.h"
#include "replica/jobs/FixUpJob.h"
#include "replica/jobs/ReplicateJob.h"
#include "replica/jobs/RebalanceJob.h"
#include "replica/jobs/PurgeJob.h"
#include "replica/mysql/DatabaseMySQL.h"
#include "replica/services/DatabaseServices.h"
#include "replica/util/ReplicaInfo.h"

using namespace std;
using json = nlohmann::json;

namespace lsst::qserv::replica {

using namespace database::mysql;

ReplicationTask::Ptr ReplicationTask::create(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
bool qservChunkMapUpdate, unsigned int replicationIntervalSec,
bool purge) {
unsigned int replicationIntervalSec, bool purge) {
return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, forceQservSync,
qservChunkMapUpdate, replicationIntervalSec, purge));
replicationIntervalSec, purge));
}

bool ReplicationTask::onRun() {
Expand All @@ -65,8 +51,6 @@ bool ReplicationTask::onRun() {
launch<FindAllJob>(priority, saveReplicaInfo, allWorkers);
sync(_qservSyncTimeoutSec, _forceQservSync);

if (_qservChunkMapUpdate) _updateChunkMap();

launch<FixUpJob>(priority);
sync(_qservSyncTimeoutSec, _forceQservSync);

Expand All @@ -89,75 +73,10 @@ bool ReplicationTask::onRun() {
ReplicationTask::ReplicationTask(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge)
unsigned int replicationIntervalSec, bool purge)
: Task(controller, "REPLICATION-THREAD ", onTerminated, replicationIntervalSec),
_qservSyncTimeoutSec(qservSyncTimeoutSec),
_forceQservSync(forceQservSync),
_qservChunkMapUpdate(qservChunkMapUpdate),
_purge(purge) {}

void ReplicationTask::_updateChunkMap() {
// Get the current status of all known chunk replica across all enabled workers,
// published databases and tables. Package this info into the JSON object of
// the following schema:
//
// {<worker>:{
// <database>:{
// <table>:[
// [<chunk>,<size>],
// ...
// ]
// }
// }

bool const allDatabases = true;
string const emptyDatabaseFilter;
bool const isPublished = true;
bool const includeFileInfo = true; // need this to access tables sizes
json chunkMap = json::object();
for (auto const& workerName : serviceProvider()->config()->workers()) {
vector<ReplicaInfo> replicas;
serviceProvider()->databaseServices()->findWorkerReplicas(replicas, workerName, emptyDatabaseFilter,
allDatabases, isPublished, includeFileInfo);
chunkMap[workerName] = json::object();
json& chunkMapWorker = chunkMap[workerName];
for (auto const& replica : replicas) {
for (auto const& fileInfo : replica.fileInfo()) {
if (fileInfo.isData() && !fileInfo.isOverlap()) {
chunkMapWorker[replica.database()][fileInfo.baseTable()].push_back(
vector<uint64_t>({replica.chunk(), fileInfo.size}));
}
}
}
}

// Open MySQL connection using the RAII-style handler that would automatically
// abort the transaction should any problem occured when loading data into the table.
ConnectionHandler h;
try {
h.conn = Connection::open(Configuration::qservCzarDbParams("qservMeta"));
} catch (exception const& ex) {
error("failed to connect to the czar's database server, ex: " + string(ex.what()));
return;
}

// Load the map into the table and purge any previous maps (if any)
QueryGenerator const g(h.conn);
string const tableName = "chunkMap";
auto const insertQuery = g.insert(tableName, chunkMap.dump(), Sql::NOW);
auto const maxUpdateTimeSubQuery =
g.subQuery(g.select(Sql::MAX_(g.id("update_time"))) + g.from(tableName));
auto const deleteQuery =
g.delete_(tableName) + g.where(g.notInSubQuery("update_time", maxUpdateTimeSubQuery));
try {
h.conn->executeInOwnTransaction([&insertQuery, &deleteQuery](auto conn) {
conn->execute(insertQuery);
conn->execute(deleteQuery);
});
} catch (exception const& ex) {
error("failed to update chunk map in the Czar database, ex: " + string(ex.what()));
return;
}
}

} // namespace lsst::qserv::replica
Loading

0 comments on commit 3214c1e

Please sign in to comment.