Skip to content

Commit

Permalink
Merge branch 'tickets/DM-37973'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Sep 21, 2023
2 parents 6dbcb49 + dc3b3b3 commit 980d965
Show file tree
Hide file tree
Showing 83 changed files with 2,502 additions and 717 deletions.
1 change: 1 addition & 0 deletions admin/local/docker/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ services:
--qserv-czar-db=mysql://root:CHANGEME@czar-db:3306/qservMeta
--log-cfg-file=/config-etc/log/log-repl-controller.cnf
--
--qserv-czar-proxy=mysql://qsmaster@czar-proxy:4040
--instance-id=qserv_proj
--auth-key=replauthkey
--admin-auth-key=repladminauthkey
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ add_custom_target(clang-format-check
#-----------------------------------------------------------------------------

add_subdirectory(admin)
add_subdirectory(cconfig)
add_subdirectory(ccontrol)
add_subdirectory(css)
add_subdirectory(czar)
Expand Down
2 changes: 1 addition & 1 deletion src/admin/python/lsst/qserv/admin/replicationInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(
self.repl_ctrl = urlparse(repl_ctrl_uri)
self.auth_key = auth_key
self.admin_auth_key = admin_auth_key
self.repl_api_version = 24
self.repl_api_version = 25
_log.debug(f"ReplicationInterface %s", self.repl_ctrl)

def version(self) -> str:
Expand Down
14 changes: 14 additions & 0 deletions src/cconfig/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
add_library(cconfig OBJECT)

target_sources(cconfig PRIVATE
CzarConfig.cc
)

target_include_directories(cconfig PRIVATE
${XROOTD_INCLUDE_DIRS}
)

target_link_libraries(cconfig PUBLIC
log
XrdSsiLib
)
14 changes: 7 additions & 7 deletions src/czar/CzarConfig.cc → src/cconfig/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
*/

// Class header
#include "czar/CzarConfig.h"

// System headers
#include "cconfig/CzarConfig.h"

// Third party headers
#include "XrdSsi/XrdSsiLogger.hh"
Expand All @@ -39,7 +37,7 @@

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarConfig");
LOG_LOGGER _log = LOG_GET("lsst.qserv.cconfig.CzarConfig");

void QservLogger(struct timeval const& mtime, unsigned long tID, const char* msg, int mlen) {
static log4cxx::spi::LocationInfo xrdLoc("client", "<xrdssi>", 0);
Expand All @@ -56,7 +54,7 @@ void QservLogger(struct timeval const& mtime, unsigned long tID, const char* msg
bool dummy = XrdSsiLogger::SetMCB(QservLogger, XrdSsiLogger::mcbClient);
} // namespace

namespace lsst::qserv::czar {
namespace lsst::qserv::cconfig {

std::mutex CzarConfig::_mtxOnInstance;

Expand Down Expand Up @@ -113,7 +111,9 @@ CzarConfig::CzarConfig(util::ConfigStore const& configStore)
_qdispVectMinRunningSizes(configStore.get("qdisppool.vectMinRunningSizes", "0:1:3:3")),
_qReqPseudoFifoMaxRunning(configStore.getInt("qdisppool.qReqPseudoFifoMaxRunning", 300)),
_notifyWorkersOnQueryFinish(configStore.getInt("tuning.notifyWorkersOnQueryFinish", 1)),
_notifyWorkersOnCzarRestart(configStore.getInt("tuning.notifyWorkersOnCzarRestart", 1)) {}
_notifyWorkersOnCzarRestart(configStore.getInt("tuning.notifyWorkersOnCzarRestart", 1)),
_czarStatsUpdateIvalSec(configStore.getInt("tuning.czarStatsUpdateIvalSec", 1)),
_czarStatsRetainPeriodSec(configStore.getInt("tuning.czarStatsRetainPeriodSec", 24 * 3600)) {}

std::ostream& operator<<(std::ostream& out, CzarConfig const& czarConfig) {
out << "[cssConfigMap=" << util::printable(czarConfig._cssConfigMap)
Expand All @@ -126,4 +126,4 @@ std::ostream& operator<<(std::ostream& out, CzarConfig const& czarConfig) {
return out;
}

} // namespace lsst::qserv::czar
} // namespace lsst::qserv::cconfig
26 changes: 21 additions & 5 deletions src/czar/CzarConfig.h → src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

#ifndef LSST_QSERV_CZAR_CZARCONFIG_H
#define LSST_QSERV_CZAR_CZARCONFIG_H
#ifndef LSST_QSERV_CCONFIG_CZARCONFIG_H
#define LSST_QSERV_CCONFIG_CZARCONFIG_H

// System headers
#include <map>
Expand All @@ -35,7 +35,7 @@
#include "mysql/MySqlConfig.h"
#include "util/ConfigStore.h"

namespace lsst::qserv::czar {
namespace lsst::qserv::cconfig {

/**
* Provide all configuration parameters for a Qserv Czar instance
Expand Down Expand Up @@ -194,6 +194,18 @@ class CzarConfig {
/// and the newer queries.
bool notifyWorkersOnCzarRestart() const { return _notifyWorkersOnCzarRestart != 0; }

/// @return The desired sampling frequency of the Czar monitoring which is
/// based on tracking state changes in various entities. If 0 is returned by
/// the method then the monitoring will be disabled.
unsigned int czarStatsUpdateIvalSec() const { return _czarStatsUpdateIvalSec; }

/// @return The maximum retain period for keeping in memory the relevant metrics
/// captured by the Czar monitoring system. If 0 is returned by the method then
/// query history archiving will be disabled.
/// @note Setting the limit too high may be potentially result in runing onto
/// the OOM situation.
unsigned int czarStatsRetainPeriodSec() const { return _czarStatsRetainPeriodSec; }

private:
CzarConfig(util::ConfigStore const& ConfigStore);

Expand Down Expand Up @@ -240,8 +252,12 @@ class CzarConfig {
// Events sent to workers
int const _notifyWorkersOnQueryFinish; ///< Sent by cccontrol::UserQuerySelect
int const _notifyWorkersOnCzarRestart; ///< Sent by czar::Czar

// Parameters used for monitoring Czar
unsigned int const _czarStatsUpdateIvalSec; ///< Used by qdisp::Executive
unsigned int const _czarStatsRetainPeriodSec; ///< Used by qdisp::CzarStats
};

} // namespace lsst::qserv::czar
} // namespace lsst::qserv::cconfig

#endif // LSST_QSERV_CZAR_CZARCONFIG_H
#endif // LSST_QSERV_CCONFIG_CZARCONFIG_H
2 changes: 2 additions & 0 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ target_sources(ccontrol PRIVATE

target_link_libraries(ccontrol PUBLIC
boost_regex
cconfig
log
parser
replica
Expand All @@ -39,6 +40,7 @@ FUNCTION(ccontrol_tests)
FOREACH(TEST IN ITEMS ${ARGV})
add_executable(${TEST} ${TEST}.cc)
target_link_libraries(${TEST} PUBLIC
cconfig
ccontrol
czar
parser
Expand Down
56 changes: 56 additions & 0 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
// System headers
#include <algorithm>
#include <cassert>
#include <chrono>
#include <cstring>

// Third-party headers
Expand All @@ -37,11 +38,13 @@

// Qserv headers
#include "ccontrol/msgCode.h"
#include "global/clock_defs.h"
#include "global/debugUtil.h"
#include "global/MsgReceiver.h"
#include "proto/ProtoHeaderWrap.h"
#include "proto/ProtoImporter.h"
#include "proto/WorkerResponse.h"
#include "qdisp/CzarStats.h"
#include "qdisp/JobQuery.h"
#include "replica/HttpClient.h"
#include "rproc/InfileMerger.h"
Expand Down Expand Up @@ -90,13 +93,37 @@ string xrootUrl2path(string const& xrootUrl) {
throw runtime_error("MergingHandler::" + string(__func__) + " illegal file resource url: " + xrootUrl);
}

/**
* Instances of this class are used to update statistic counter on starting
* and finishing operations with the result files.
*/
class ResultFileTracker {
public:
ResultFileTracker() { lsst::qserv::qdisp::CzarStats::get()->addResultFile(); }
~ResultFileTracker() { lsst::qserv::qdisp::CzarStats::get()->deleteResultFile(); }
};

// The logging function employed by the transmit rate tracker to report
// the data transfer rates in a histogram. The histogram is used in
// the performance monitoring of the application.
lsst::qserv::TimeCountTracker<double>::CALLBACKFUNC const reportFileRecvRate =
[](lsst::qserv::TIMEPOINT start, lsst::qserv::TIMEPOINT end, double bytes, bool success) {
if (!success) return;
if (chrono::duration<double> const seconds = end - start; seconds.count() > 0) {
lsst::qserv::qdisp::CzarStats::get()->addFileReadRate(bytes / seconds.count());
}
};

bool readXrootFileResourceAndMerge(lsst::qserv::proto::Result const& result,
function<bool(char const*, uint32_t)> const& messageIsReady) {
string const context = "MergingHandler::" + string(__func__) + " ";

// Extract data from the input result object before modifying the one.
string const xrootUrl = result.fileresource_xroot();

// Track the file while the control flow is staying within the function.
ResultFileTracker const resultFileTracker;

// The algorithm will read the input file to locate result objects containing rows
// and call the provided callback for each such row.
XrdCl::File file;
Expand All @@ -118,6 +145,9 @@ bool readXrootFileResourceAndMerge(lsst::qserv::proto::Result const& result,
bool success = true;
try {
while (true) {
// This starts a timer of the data transmit rate tracker.
auto transmitRateTracker = make_unique<lsst::qserv::TimeCountTracker<double>>(reportFileRecvRate);

// Read the frame header that carries a size of the subsequent message.
uint32_t msgSizeBytes = 0;
uint32_t bytesRead = 0;
Expand Down Expand Up @@ -170,6 +200,14 @@ bool readXrootFileResourceAndMerge(lsst::qserv::proto::Result const& result,
offset += bytesRead;
bytes2read -= bytesRead;
}

// Destroying the tracker will result in stopping the tracker's timer and
// reporting the file read rate before proceeding to the merge.
transmitRateTracker->addToValue(msgSizeBytes);
transmitRateTracker->setSuccess();
transmitRateTracker.reset();

// Proceed to the result merge
success = messageIsReady(buf.get(), msgSizeBytes);
if (!success) break;
}
Expand Down Expand Up @@ -201,6 +239,12 @@ bool readHttpFileAndMerge(lsst::qserv::proto::Result const& result,
// Extract data from the input result object before modifying the one.
string const httpUrl = result.fileresource_http();

// Track the file while the control flow is staying within the function.
ResultFileTracker const resultFileTracker;

// The data transmit rate tracker is set up before reading each data message.
unique_ptr<lsst::qserv::TimeCountTracker<double>> transmitRateTracker;

// A location of the next byte to be read from the input file. The variable
// is used for error reporting.
uint64_t offset = 0;
Expand Down Expand Up @@ -256,6 +300,9 @@ bool readHttpFileAndMerge(lsst::qserv::proto::Result const& result,
msgBufSize = msgSizeBytes;
msgBuf.reset(new char[msgBufSize]);
}
// Starts the tracker to measure the performance of the network I/O.
transmitRateTracker =
make_unique<lsst::qserv::TimeCountTracker<double>>(reportFileRecvRate);
}
} else {
// Continue or finish reading the message body.
Expand All @@ -268,6 +315,15 @@ bool readHttpFileAndMerge(lsst::qserv::proto::Result const& result,
if (msgBufNext == msgSizeBytes) {
// Done reading message body.
msgBufNext = 0;

// Destroying the tracker will result in stopping the tracker's timer and
// reporting the file read rate before proceeding to the merge.
if (transmitRateTracker != nullptr) {
transmitRateTracker->addToValue(msgSizeBytes);
transmitRateTracker->setSuccess();
transmitRateTracker.reset();
}

// Parse and evaluate the message.
bool const success = messageIsReady(msgBuf.get(), msgSizeBytes);
if (!success) {
Expand Down
34 changes: 27 additions & 7 deletions src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "lsst/log/Log.h"

// Qserv headers
#include "cconfig/CzarConfig.h"
#include "ccontrol/ConfigError.h"
#include "ccontrol/ConfigMap.h"
#include "ccontrol/ParseRunner.h"
Expand All @@ -51,7 +52,6 @@
#include "ccontrol/UserQueryType.h"
#include "css/CssAccess.h"
#include "css/KvInterfaceImplMem.h"
#include "czar/CzarConfig.h"
#include "mysql/MySqlConfig.h"
#include "parser/ParseException.h"
#include "qdisp/Executive.h"
Expand Down Expand Up @@ -151,7 +151,7 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt,
auto const& fromTable = tableRefPtr->getTable();
rowsTable = fromDb + "__" + fromTable + "__rows";
// TODO consider using QMetaSelect instead of making a new connection.
auto cnx = sql::SqlConnectionFactory::make(czar::CzarConfig::instance()->getMySqlQmetaConfig());
auto cnx = sql::SqlConnectionFactory::make(cconfig::CzarConfig::instance()->getMySqlQmetaConfig());
sql::SqlErrorObject err;
auto tableExists = cnx->tableExists(rowsTable, err);
LOGS(_log, LOG_LVL_DEBUG,
Expand All @@ -162,7 +162,7 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt,

std::shared_ptr<UserQuerySharedResources> makeUserQuerySharedResources(
std::shared_ptr<qproc::DatabaseModels> const& dbModels, std::string const& czarName) {
std::shared_ptr<czar::CzarConfig> const czarConfig = czar::CzarConfig::instance();
auto const czarConfig = cconfig::CzarConfig::instance();
return std::make_shared<UserQuerySharedResources>(
css::CssAccess::createFromConfig(czarConfig->getCssConfigMap(), czarConfig->getEmptyChunkPath()),
czarConfig->getMySqlResultConfig(),
Expand All @@ -178,8 +178,9 @@ std::shared_ptr<UserQuerySharedResources> makeUserQuerySharedResources(
////////////////////////////////////////////////////////////////////////
UserQueryFactory::UserQueryFactory(qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName)
: _userQuerySharedResources(makeUserQuerySharedResources(dbModels, czarName)),
_useQservRowCounterOptimization(true) {
std::shared_ptr<czar::CzarConfig> const czarConfig = czar::CzarConfig::instance();
_useQservRowCounterOptimization(true),
_asioIoService() {
auto const czarConfig = cconfig::CzarConfig::instance();
_executiveConfig = std::make_shared<qdisp::ExecutiveConfig>(
czarConfig->getXrootdFrontendUrl(), czarConfig->getQMetaSecondsBetweenChunkUpdates());

Expand All @@ -192,6 +193,24 @@ UserQueryFactory::UserQueryFactory(qproc::DatabaseModels::Ptr const& dbModels, s
// Add logging context with czar ID
qmeta::CzarId qMetaCzarId = _userQuerySharedResources->qMetaCzarId;
LOG_MDC_INIT([qMetaCzarId]() { LOG_MDC("CZID", std::to_string(qMetaCzarId)); });

// BOOST ASIO service is started to process asynchronous timer requests
// in the dedicated thread. However, before starting the thread we need
// to attach the ASIO's "work" object to the ASIO I/O service. This is needed
// to keep the latter busy and prevent the servicing thread from exiting before
// the destruction of this class due to a lack of async requests.
_asioWork.reset(new boost::asio::io_service::work(_asioIoService));

// Start the timer servicing thread
_asioTimerThread.reset(new std::thread([&]() { _asioIoService.run(); }));
}

UserQueryFactory::~UserQueryFactory() {
// Shut down all ongoing (if any) operations on the I/O service
// to unblock the servicing thread.
_asioWork.reset();
_asioIoService.stop();
_asioTimerThread->join();
}

UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::string const& defaultDb,
Expand Down Expand Up @@ -287,8 +306,9 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
std::shared_ptr<qdisp::Executive> executive;
std::shared_ptr<rproc::InfileMergerConfig> infileMergerConfig;
if (sessionValid) {
executive = qdisp::Executive::create(*_executiveConfig, messageStore, qdispSharedResources,
_userQuerySharedResources->queryStatsData, qs);
executive =
qdisp::Executive::create(*_executiveConfig, messageStore, qdispSharedResources,
_userQuerySharedResources->queryStatsData, qs, _asioIoService);
infileMergerConfig =
std::make_shared<rproc::InfileMergerConfig>(_userQuerySharedResources->mysqlResultConfig);
infileMergerConfig->debugNoMerge = _debugNoMerge;
Expand Down
14 changes: 14 additions & 0 deletions src/ccontrol/UserQueryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
// System headers
#include <cstdint>
#include <memory>
#include <thread>

// Third-party headers
#include "boost/asio.hpp"
#include "boost/utility.hpp"

// Local headers
Expand Down Expand Up @@ -69,6 +71,10 @@ class UserQueryFactory : private boost::noncopyable {
public:
UserQueryFactory(std::shared_ptr<qproc::DatabaseModels> const& dbModels, std::string const& czarName);

/// Non-trivial destructor is needed to stop the BOOST ASIO I/O service
/// and join with the timer servicing thread.
~UserQueryFactory();

/// @param query: Query text
/// @param defaultDb: Default database name, may be empty
/// @param qdispPool: Thread pool handling qdisp jobs.
Expand All @@ -85,6 +91,14 @@ class UserQueryFactory : private boost::noncopyable {
std::shared_ptr<qdisp::ExecutiveConfig> _executiveConfig;
bool _useQservRowCounterOptimization;
bool _debugNoMerge = false;
// BOOST ASIO service is started to process asynchronous timer requests
// in the dedicated thread. The thread is started by the c-tor of the class.
// The "work" object _asioWork is attached to the ASIO I/O service in order to
// keep the latter busy and prevent the servicing thread from exiting before
// the destruction of this class.
boost::asio::io_service _asioIoService;
std::unique_ptr<boost::asio::io_service::work> _asioWork;
std::unique_ptr<std::thread> _asioTimerThread;
};

} // namespace lsst::qserv::ccontrol
Expand Down
Loading

0 comments on commit 980d965

Please sign in to comment.