From a0a08bc0a5ab2891095ef388821c6e939ad36630 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 8 Jun 2023 17:48:26 -0700 Subject: [PATCH 1/8] Minor fixes and refactoring in the code documentation --- src/wpublish/GetStatusQservRequest.h | 8 +------- src/wpublish/QservRequest.h | 18 +++++------------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/src/wpublish/GetStatusQservRequest.h b/src/wpublish/GetStatusQservRequest.h index 649d8a1721..45b1189362 100644 --- a/src/wpublish/GetStatusQservRequest.h +++ b/src/wpublish/GetStatusQservRequest.h @@ -65,13 +65,7 @@ class GetStatusQservRequest : public QservRequest { * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param includeTasks (optional) flag telling the worker service to include detailed - * info on the known tasks. - * @param queryIds (optional) collection of the queries for for selecting tasks. - * The parameter is ignored if 'includeTasks=false'. If the collection is empty - * then all tasks will be included into the status report. - * @param taskStates (optional) collection of the task state(s). If the collection - * is empty then the selector is disregarded. + * @param taskSelector (optional) task selection criterias. * @param onFinish (optional )callback function to be called upon the completion * (successful or not) of the request. * @see wbase::Task::Status diff --git a/src/wpublish/QservRequest.h b/src/wpublish/QservRequest.h index 3144b6ddbd..e46d5b4f41 100644 --- a/src/wpublish/QservRequest.h +++ b/src/wpublish/QservRequest.h @@ -37,7 +37,7 @@ namespace lsst::qserv::wpublish { /** * Class QservRequest is a base class for a family of the client-side requests - * (classes) to the Qserv worker management services. + * (classes) to Qserv workers. */ class QservRequest : public XrdSsiRequest { public: @@ -52,14 +52,12 @@ class QservRequest : public XrdSsiRequest { /** * Serialize a request into the provided buffer. The method is required to be * provided by a subclass. - * * @param buf A request buffer for serializing a request. */ virtual void onRequest(proto::FrameBuffer& buf) = 0; /** * Process response from Qserv. The method is required to be provided by a subclass. - * * @param view The buffer view for parsing results. */ virtual void onResponse(proto::FrameBufferView& view) = 0; @@ -67,28 +65,22 @@ class QservRequest : public XrdSsiRequest { /** * Notify a base class about a failure occurred when sending a request data * or receiving a response. - * * @param error A message explaining a reason of the failure. */ virtual void onError(std::string const& msg) = 0; char* GetRequest(int& dlen) override; - bool ProcessResponse(const XrdSsiErrInfo& eInfo, const XrdSsiRespInfo& rInfo) override; - void ProcessResponseData(const XrdSsiErrInfo& eInfo, char* buff, int blen, bool last) override; private: - // Request buffer (gets prepared by subclasses before sending a request - // to the worker service of Qserv) - - proto::FrameBuffer _frameBuf; ///< buffer for serializing messages before sending them - /// The global counter for the number of instances of any subclasses static std::atomic _numClassInstances; - // Response buffer (gets updated when receiving a response stream of - // data from a worker management service of Qserv) + /// Request buffer is prepared by subclasses before sending a request to a worker. + proto::FrameBuffer _frameBuf; + + // Response buffer is updated when receiving a response stream of data from a worker. /// The (very first and the) last increment of the capacity of the incoming /// buffer is used to limit the amount of bytes to be received from a server. From 7619ea3cde48c6e6e18443d61cf58cd192fce9b6 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Tue, 6 Jun 2023 18:57:40 -0700 Subject: [PATCH 2/8] Minor refactoring in the XROOTD/SSI resource definition Eliminated unused definitions. Better names and return types for some methods. --- src/global/ResourceUnit.cc | 59 ++++------------------------------ src/global/ResourceUnit.h | 23 ++++--------- src/global/testResourceUnit.cc | 12 ++----- src/wpublish/ChunkInventory.cc | 2 +- src/xrdsvc/SsiProvider.cc | 4 +-- 5 files changed, 20 insertions(+), 80 deletions(-) diff --git a/src/global/ResourceUnit.cc b/src/global/ResourceUnit.cc index 897fe56e3b..b223a07056 100644 --- a/src/global/ResourceUnit.cc +++ b/src/global/ResourceUnit.cc @@ -74,8 +74,7 @@ std::string ResourceUnit::path() const { switch (_unitType) { case GARBAGE: return "/GARBAGE"; - case DBCHUNK: // For now, DBCHUNK is handled the same as CQUERY - case CQUERY: + case DBCHUNK: ss << _pathSep << _db; if (_chunk != -1) { ss << _pathSep << _chunk; @@ -84,9 +83,8 @@ std::string ResourceUnit::path() const { case UNKNOWN: ss << _pathSep << "UNKNOWN_RESOURCE_UNIT"; break; - case RESULT: case WORKER: - ss << _hashName; + ss << _workerId; break; default: ::abort(); @@ -107,12 +105,8 @@ std::string ResourceUnit::prefix(UnitType const& r) { switch (r) { case DBCHUNK: return "chk"; - case CQUERY: - return "q"; case UNKNOWN: return "UNKNOWN"; - case RESULT: - return "result"; case WORKER: return "worker"; case GARBAGE: @@ -122,10 +116,11 @@ std::string ResourceUnit::prefix(UnitType const& r) { } std::string ResourceUnit::makePath(int chunk, std::string const& db) { - return "/" + prefix(UnitType::DBCHUNK) + "/" + db + "/" + std::to_string(chunk); + return _pathSep + prefix(UnitType::DBCHUNK) + _pathSep + db + _pathSep + std::to_string(chunk); } + std::string ResourceUnit::makeWorkerPath(std::string const& id) { - return "/" + prefix(UnitType::WORKER) + "/" + id; + return _pathSep + prefix(UnitType::WORKER) + _pathSep + id; } void ResourceUnit::setAsDbChunk(std::string const& db, int chunk) { @@ -134,12 +129,6 @@ void ResourceUnit::setAsDbChunk(std::string const& db, int chunk) { _chunk = chunk; } -void ResourceUnit::setAsCquery(std::string const& db, int chunk) { - _unitType = CQUERY; - _db = db; - _chunk = chunk; -} - bool ResourceUnit::_markGarbageIfDone(Tokenizer& t) { if (t.done()) { _unitType = GARBAGE; @@ -182,48 +171,14 @@ void ResourceUnit::_setFromPath(std::string const& path) { } _chunk = t.tokenAsInt(); _ingestLeafAndKeys(t.token()); - } else if (rTypeString == prefix(CQUERY)) { - // Import as chunk query - _unitType = CQUERY; - if (_markGarbageIfDone(t)) { - return; - } - t.next(); - _db = t.token(); - if (_db.empty()) { - _unitType = GARBAGE; - return; - } - if (_markGarbageIfDone(t)) { - return; - } - t.next(); - if (t.token().empty()) { - _unitType = GARBAGE; - return; - } - _chunk = t.tokenAsInt(); - _ingestLeafAndKeys(t.token()); - - } else if (rTypeString == prefix(RESULT)) { - _unitType = RESULT; - if (_markGarbageIfDone(t)) { - return; - } - t.next(); - _hashName = t.token(); - if (_hashName.empty()) { - _unitType = GARBAGE; - return; - } } else if (rTypeString == prefix(WORKER)) { _unitType = WORKER; if (_markGarbageIfDone(t)) { return; } t.next(); - _hashName = t.token(); - if (_hashName.empty()) { + _workerId = t.token(); + if (_workerId.empty()) { _unitType = GARBAGE; return; } diff --git a/src/global/ResourceUnit.h b/src/global/ResourceUnit.h index 1e29c981af..57e01e9dc7 100644 --- a/src/global/ResourceUnit.h +++ b/src/global/ResourceUnit.h @@ -45,7 +45,7 @@ namespace lsst::qserv { class ResourceUnit { public: class Checker; - enum UnitType { GARBAGE, DBCHUNK, CQUERY, UNKNOWN, RESULT, WORKER }; + enum UnitType { GARBAGE, DBCHUNK, UNKNOWN, WORKER }; ResourceUnit() : _unitType(GARBAGE), _chunk(-1) {} @@ -56,9 +56,9 @@ class ResourceUnit { // Retrieve elements of the path. UnitType unitType() const { return _unitType; } - std::string db() const { return _db; } + std::string const& db() const { return _db; } int chunk() const { return _chunk; } - std::string hashName() const { return _hashName; } + std::string const& workerId() const { return _workerId; } /// Lookup extended path variables (?k=val syntax) std::string var(std::string const& key) const; @@ -75,15 +75,6 @@ class ResourceUnit { // Setup a path of a certain type. void setAsDbChunk(std::string const& db, int chunk = DUMMY_CHUNK); - // Compatibility types - void setAsCquery(std::string const& db, int chunk = DUMMY_CHUNK); - void setAsResult(std::string const& hashName); - - // Optional specifiers may not be supported by XrdSsi - // Add optional specifiers ?foo&bar=1&bar2=2 - void addKey(std::string const& key); - void addKey(std::string const& key, int val); - private: class Tokenizer; void _setFromPath(std::string const& path); @@ -91,10 +82,10 @@ class ResourceUnit { void _ingestKeyStr(std::string const& keyStr); bool _markGarbageIfDone(Tokenizer& t); - UnitType _unitType; //< Type of unit - std::string _db; //< for CQUERY and DBCHUNK types - int _chunk; //< for CQUERY and DBCHUNK types - std::string _hashName; //< for RESULT and WORKER types + UnitType _unitType = UnitType::GARBAGE; //< Type of unit + std::string _db; //< for DBCHUNK type + int _chunk = -1; //< for DBCHUNK type + std::string _workerId; //< for WORKER type typedef std::map VarMap; VarMap _vars; //< Key-value specifiers diff --git a/src/global/testResourceUnit.cc b/src/global/testResourceUnit.cc index 36125eece4..993da67dd0 100644 --- a/src/global/testResourceUnit.cc +++ b/src/global/testResourceUnit.cc @@ -81,17 +81,11 @@ BOOST_AUTO_TEST_CASE(DbChunk) { BOOST_CHECK_EQUAL(r[1].path(), "/chk/bar/968"); } -BOOST_AUTO_TEST_CASE(Old) { - ResourceUnit cq("/q/Foo/123"); - ResourceUnit res("/result/1234567890abcde"); - BOOST_CHECK_EQUAL(cq.unitType(), ResourceUnit::CQUERY); - BOOST_CHECK_EQUAL(res.unitType(), ResourceUnit::RESULT); -} - BOOST_AUTO_TEST_CASE(Worker) { - ResourceUnit res("/worker/worker-1"); + std::string const id = "worker-1"; + ResourceUnit res("/worker/" + id); BOOST_CHECK_EQUAL(res.unitType(), ResourceUnit::WORKER); - BOOST_CHECK_EQUAL(res.hashName(), "worker-1"); + BOOST_CHECK_EQUAL(res.workerId(), id); } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/wpublish/ChunkInventory.cc b/src/wpublish/ChunkInventory.cc index 07fa4306ff..80312c17ab 100644 --- a/src/wpublish/ChunkInventory.cc +++ b/src/wpublish/ChunkInventory.cc @@ -141,7 +141,7 @@ class Validator : public lsst::qserv::ResourceUnit::Checker { case lsst::qserv::ResourceUnit::DBCHUNK: return chunkInventory.has(ru.db(), ru.chunk()); case lsst::qserv::ResourceUnit::WORKER: - return chunkInventory.id() == ru.hashName(); + return chunkInventory.id() == ru.workerId(); default: return false; } diff --git a/src/xrdsvc/SsiProvider.cc b/src/xrdsvc/SsiProvider.cc index a5ea63e213..62336749e3 100644 --- a/src/xrdsvc/SsiProvider.cc +++ b/src/xrdsvc/SsiProvider.cc @@ -163,7 +163,7 @@ XrdSsiProvider::rStat SsiProviderServer::QueryResource(char const* rName, char c } else if (ru.unitType() == ResourceUnit::WORKER) { // Extract the worker name and alidate it against the one which is // provided through the inventory - if (not _chunkInventory.id().empty() and _chunkInventory.id() == ru.hashName()) { + if (not _chunkInventory.id().empty() and _chunkInventory.id() == ru.workerId()) { LOGS(_log, LOG_LVL_DEBUG, "SsiProvider Query " << rName << " present"); return isPresent; } @@ -190,7 +190,7 @@ void SsiProviderServer::ResourceAdded(const char* rName) { } else if (ru.unitType() == ResourceUnit::WORKER) { // Replace the unique identifier of the worker with the new one - _chunkInventory.resetId(ru.hashName()); + _chunkInventory.resetId(ru.workerId()); LOGS(_log, LOG_LVL_DEBUG, "SsiProvider ResourceAdded " << rName); return; } From 12dc56585d143ac0c83c42d299ef5ec2972d93ff Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Tue, 6 Jun 2023 18:53:39 -0700 Subject: [PATCH 3/8] Added XROOTD/SSI resource type for operations over queries --- src/global/ResourceUnit.cc | 8 ++++++++ src/global/ResourceUnit.h | 9 ++++++--- src/global/testResourceUnit.cc | 7 +++++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/global/ResourceUnit.cc b/src/global/ResourceUnit.cc index b223a07056..0dc548cbce 100644 --- a/src/global/ResourceUnit.cc +++ b/src/global/ResourceUnit.cc @@ -109,6 +109,8 @@ std::string ResourceUnit::prefix(UnitType const& r) { return "UNKNOWN"; case WORKER: return "worker"; + case QUERY: + return "query"; case GARBAGE: default: return "GARBAGE"; @@ -182,6 +184,12 @@ void ResourceUnit::_setFromPath(std::string const& path) { _unitType = GARBAGE; return; } + } else if (rTypeString == prefix(QUERY)) { + _unitType = QUERY; + if (!t.done()) { + _unitType = GARBAGE; + return; + } } else { _unitType = GARBAGE; } diff --git a/src/global/ResourceUnit.h b/src/global/ResourceUnit.h index 57e01e9dc7..b141c27c59 100644 --- a/src/global/ResourceUnit.h +++ b/src/global/ResourceUnit.h @@ -45,16 +45,19 @@ namespace lsst::qserv { class ResourceUnit { public: class Checker; - enum UnitType { GARBAGE, DBCHUNK, UNKNOWN, WORKER }; - - ResourceUnit() : _unitType(GARBAGE), _chunk(-1) {} + enum UnitType { GARBAGE, DBCHUNK, UNKNOWN, WORKER, QUERY }; + ResourceUnit() = default; explicit ResourceUnit(std::string const& path); + ResourceUnit(ResourceUnit const&) = default; + ResourceUnit& operator=(ResourceUnit const&) = default; + ~ResourceUnit() = default; /// @return the constructed path. std::string path() const; // Retrieve elements of the path. + UnitType unitType() const { return _unitType; } std::string const& db() const { return _db; } int chunk() const { return _chunk; } diff --git a/src/global/testResourceUnit.cc b/src/global/testResourceUnit.cc index 993da67dd0..e0fd222791 100644 --- a/src/global/testResourceUnit.cc +++ b/src/global/testResourceUnit.cc @@ -88,4 +88,11 @@ BOOST_AUTO_TEST_CASE(Worker) { BOOST_CHECK_EQUAL(res.workerId(), id); } +BOOST_AUTO_TEST_CASE(Query) { + ResourceUnit const res1("/query"); + BOOST_CHECK_EQUAL(res1.unitType(), ResourceUnit::QUERY); + ResourceUnit const res2("/query/abc"); + BOOST_CHECK_EQUAL(res2.unitType(), ResourceUnit::GARBAGE); +} + BOOST_AUTO_TEST_SUITE_END() From c937595eff8f2eb871ed003da01f34b5247e8aea Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 9 Jun 2023 18:03:27 -0700 Subject: [PATCH 4/8] More reliable implementation for cancelling XROOTD/SSI request The new implementation is based on the smart pointer to self be stored by the Qserv request classes. It would guarantee the life span of the request objects while request processing is still going on. Otherwse, there is a chance of running into crashes. The request cancellaton method has been added to replace SSI's Finished(true) to do a proper clean up of the stored pointer. --- src/replica/AddReplicaQservMgtRequest.cc | 9 +----- src/replica/GetReplicasQservMgtRequest.cc | 9 +----- src/replica/GetStatusQservMgtRequest.cc | 6 +--- src/replica/RemoveReplicaQservMgtRequest.cc | 9 +----- src/replica/SetReplicasQservMgtRequest.cc | 9 +----- src/replica/TestEchoQservMgtRequest.cc | 9 +----- src/wpublish/ChunkGroupQservRequest.cc | 8 +++-- src/wpublish/ChunkListQservRequest.cc | 8 +++-- src/wpublish/GetChunkListQservRequest.cc | 4 ++- src/wpublish/GetStatusQservRequest.cc | 4 ++- src/wpublish/QservRequest.cc | 36 +++++++++++++++++++-- src/wpublish/QservRequest.h | 25 +++++++++++++- src/wpublish/SetChunkListQservRequest.cc | 4 ++- src/wpublish/TestEchoQservRequest.cc | 4 ++- src/wpublish/qserv-worker-perf.cc | 3 +- src/wpublish/qserv-worker-status.cc | 3 +- 16 files changed, 90 insertions(+), 60 deletions(-) diff --git a/src/replica/AddReplicaQservMgtRequest.cc b/src/replica/AddReplicaQservMgtRequest.cc index 35526cdd33..a2f79d66b3 100644 --- a/src/replica/AddReplicaQservMgtRequest.cc +++ b/src/replica/AddReplicaQservMgtRequest.cc @@ -113,15 +113,8 @@ void AddReplicaQservMgtRequest::finishImpl(replica::Lock const& lock) { switch (extendedState()) { case ExtendedState::CANCELLED: case ExtendedState::TIMEOUT_EXPIRED: - - // And if the SSI request is still around then tell it to stop - - if (_qservRequest) { - bool const cancel = true; - _qservRequest->Finished(cancel); - } + if (_qservRequest) _qservRequest->cancel(); break; - default: break; } diff --git a/src/replica/GetReplicasQservMgtRequest.cc b/src/replica/GetReplicasQservMgtRequest.cc index d142bb9902..36f230b0c1 100644 --- a/src/replica/GetReplicasQservMgtRequest.cc +++ b/src/replica/GetReplicasQservMgtRequest.cc @@ -146,15 +146,8 @@ void GetReplicasQservMgtRequest::finishImpl(replica::Lock const& lock) { switch (extendedState()) { case ExtendedState::CANCELLED: case ExtendedState::TIMEOUT_EXPIRED: - - // And if the SSI request is still around then tell it to stop - - if (_qservRequest) { - bool const cancel = true; - _qservRequest->Finished(cancel); - } + if (_qservRequest) _qservRequest->cancel(); break; - default: break; } diff --git a/src/replica/GetStatusQservMgtRequest.cc b/src/replica/GetStatusQservMgtRequest.cc index a5b3e9f18c..aa3483a580 100644 --- a/src/replica/GetStatusQservMgtRequest.cc +++ b/src/replica/GetStatusQservMgtRequest.cc @@ -115,11 +115,7 @@ void GetStatusQservMgtRequest::finishImpl(replica::Lock const& lock) { switch (extendedState()) { case ExtendedState::CANCELLED: case ExtendedState::TIMEOUT_EXPIRED: - // And if the SSI request is still around then tell it to stop - if (_qservRequest) { - bool const cancel = true; - _qservRequest->Finished(cancel); - } + if (_qservRequest) _qservRequest->cancel(); break; default: break; diff --git a/src/replica/RemoveReplicaQservMgtRequest.cc b/src/replica/RemoveReplicaQservMgtRequest.cc index 7992eb983e..2d89b2f14e 100644 --- a/src/replica/RemoveReplicaQservMgtRequest.cc +++ b/src/replica/RemoveReplicaQservMgtRequest.cc @@ -116,15 +116,8 @@ void RemoveReplicaQservMgtRequest::finishImpl(replica::Lock const& lock) { switch (extendedState()) { case ExtendedState::CANCELLED: case ExtendedState::TIMEOUT_EXPIRED: - - // And if the SSI request is still around then tell it to stop - - if (_qservRequest) { - bool const cancel = true; - _qservRequest->Finished(cancel); - } + if (_qservRequest) _qservRequest->cancel(); break; - default: break; } diff --git a/src/replica/SetReplicasQservMgtRequest.cc b/src/replica/SetReplicasQservMgtRequest.cc index ccc7aae710..e031ab67d3 100644 --- a/src/replica/SetReplicasQservMgtRequest.cc +++ b/src/replica/SetReplicasQservMgtRequest.cc @@ -146,15 +146,8 @@ void SetReplicasQservMgtRequest::finishImpl(replica::Lock const& lock) { switch (extendedState()) { case ExtendedState::CANCELLED: case ExtendedState::TIMEOUT_EXPIRED: - - // And if the SSI request is still around then tell it to stop - - if (_qservRequest) { - bool const cancel = true; - _qservRequest->Finished(cancel); - } + if (_qservRequest) _qservRequest->cancel(); break; - default: break; } diff --git a/src/replica/TestEchoQservMgtRequest.cc b/src/replica/TestEchoQservMgtRequest.cc index 7fcdf62181..04f4c48aad 100644 --- a/src/replica/TestEchoQservMgtRequest.cc +++ b/src/replica/TestEchoQservMgtRequest.cc @@ -113,15 +113,8 @@ void TestEchoQservMgtRequest::finishImpl(replica::Lock const& lock) { switch (extendedState()) { case ExtendedState::CANCELLED: case ExtendedState::TIMEOUT_EXPIRED: - - // And if the SSI request is still around then tell it to stop - - if (_qservRequest) { - bool const cancel = true; - _qservRequest->Finished(cancel); - } + if (_qservRequest) _qservRequest->cancel(); break; - default: break; } diff --git a/src/wpublish/ChunkGroupQservRequest.cc b/src/wpublish/ChunkGroupQservRequest.cc index 47311e2e28..0da4d7fd48 100644 --- a/src/wpublish/ChunkGroupQservRequest.cc +++ b/src/wpublish/ChunkGroupQservRequest.cc @@ -138,7 +138,9 @@ void ChunkGroupQservRequest::onError(string const& error) { AddChunkGroupQservRequest::Ptr AddChunkGroupQservRequest::create(unsigned int chunk, vector const& databases, CallbackType onFinish) { - return AddChunkGroupQservRequest::Ptr(new AddChunkGroupQservRequest(chunk, databases, onFinish)); + AddChunkGroupQservRequest::Ptr ptr(new AddChunkGroupQservRequest(chunk, databases, onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; } AddChunkGroupQservRequest::AddChunkGroupQservRequest(unsigned int chunk, vector const& databases, @@ -148,8 +150,10 @@ AddChunkGroupQservRequest::AddChunkGroupQservRequest(unsigned int chunk, vector< RemoveChunkGroupQservRequest::Ptr RemoveChunkGroupQservRequest::create(unsigned int chunk, vector const& databases, bool force, CallbackType onFinish) { - return RemoveChunkGroupQservRequest::Ptr( + RemoveChunkGroupQservRequest::Ptr ptr( new RemoveChunkGroupQservRequest(chunk, databases, force, onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; } RemoveChunkGroupQservRequest::RemoveChunkGroupQservRequest(unsigned int chunk, diff --git a/src/wpublish/ChunkListQservRequest.cc b/src/wpublish/ChunkListQservRequest.cc index aacb9b68e0..b5c0016d17 100644 --- a/src/wpublish/ChunkListQservRequest.cc +++ b/src/wpublish/ChunkListQservRequest.cc @@ -142,7 +142,9 @@ void ChunkListQservRequest::onError(string const& error) { ReloadChunkListQservRequest::Ptr ReloadChunkListQservRequest::create( ChunkListQservRequest::CallbackType onFinish) { - return ReloadChunkListQservRequest::Ptr(new ReloadChunkListQservRequest(onFinish)); + ReloadChunkListQservRequest::Ptr ptr(new ReloadChunkListQservRequest(onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; } ReloadChunkListQservRequest::ReloadChunkListQservRequest(ChunkListQservRequest::CallbackType onFinish) @@ -150,7 +152,9 @@ ReloadChunkListQservRequest::ReloadChunkListQservRequest(ChunkListQservRequest:: RebuildChunkListQservRequest::Ptr RebuildChunkListQservRequest::create( bool reload, ChunkListQservRequest::CallbackType onFinish) { - return RebuildChunkListQservRequest::Ptr(new RebuildChunkListQservRequest(reload, onFinish)); + RebuildChunkListQservRequest::Ptr ptr(new RebuildChunkListQservRequest(reload, onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; } RebuildChunkListQservRequest::RebuildChunkListQservRequest(bool reload, diff --git a/src/wpublish/GetChunkListQservRequest.cc b/src/wpublish/GetChunkListQservRequest.cc index 0bc16866b2..c2b593a7df 100644 --- a/src/wpublish/GetChunkListQservRequest.cc +++ b/src/wpublish/GetChunkListQservRequest.cc @@ -63,7 +63,9 @@ string GetChunkListQservRequest::status2str(Status status) { GetChunkListQservRequest::Ptr GetChunkListQservRequest::create( bool inUseOnly, GetChunkListQservRequest::CallbackType onFinish) { - return GetChunkListQservRequest::Ptr(new GetChunkListQservRequest(inUseOnly, onFinish)); + GetChunkListQservRequest::Ptr ptr(new GetChunkListQservRequest(inUseOnly, onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; } GetChunkListQservRequest::GetChunkListQservRequest(bool inUseOnly, diff --git a/src/wpublish/GetStatusQservRequest.cc b/src/wpublish/GetStatusQservRequest.cc index 98fb0c1421..f4522ecfd6 100644 --- a/src/wpublish/GetStatusQservRequest.cc +++ b/src/wpublish/GetStatusQservRequest.cc @@ -49,7 +49,9 @@ string GetStatusQservRequest::status2str(Status status) { GetStatusQservRequest::Ptr GetStatusQservRequest::create(wbase::TaskSelector const& taskSelector, GetStatusQservRequest::CallbackType onFinish) { - return GetStatusQservRequest::Ptr(new GetStatusQservRequest(taskSelector, onFinish)); + GetStatusQservRequest::Ptr ptr(new GetStatusQservRequest(taskSelector, onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; } GetStatusQservRequest::GetStatusQservRequest(wbase::TaskSelector const& taskSelector, diff --git a/src/wpublish/QservRequest.cc b/src/wpublish/QservRequest.cc index 63270c8dec..a591d0bdde 100644 --- a/src/wpublish/QservRequest.cc +++ b/src/wpublish/QservRequest.cc @@ -24,7 +24,8 @@ #include "wpublish/QservRequest.h" // System headers -#include +#include +#include // Qserv headers #include "lsst/log/Log.h" @@ -62,6 +63,23 @@ QservRequest::QservRequest() LOGS(_log, LOG_LVL_DEBUG, "QservRequest constructed instances: " << _numClassInstances); } +void QservRequest::cancel() { + // This will decrement the reference counter to the pointee at the end of the current + // block regardless of any exceptions that may be thrown below. + auto self = move(_refToSelf4keepAlive); + Finished(true); +} + +void QservRequest::setRefToSelf4keepAlive(shared_ptr ptr) { + if ((ptr == nullptr) || (this != ptr.get())) { + stringstream ss; + ss << "QservRequest::" << __func__ << ": the value of " << ptr + << " passed as an argument is not pointing to the current object."; + throw invalid_argument(ss.str()); + } + _refToSelf4keepAlive = ptr; +} + char* QservRequest::GetRequest(int& dlen) { // Ask a subclass to serialize its request into the frame buffer onRequest(_frameBuf); @@ -75,6 +93,10 @@ bool QservRequest::ProcessResponse(const XrdSsiErrInfo& eInfo, const XrdSsiRespI string const context = "QservRequest::" + string(__func__) + " "; if (eInfo.hasError()) { + // This will decrement the reference counter to the pointee at the end of the current + // block regardless of any exceptions that may be thrown below. + auto self = move(_refToSelf4keepAlive); + // Copy the argument before sending the upstream notification // Otherwise the current object may get disposed before we even had // a chance to notify XRootD/SSI by calling Finished(). @@ -89,7 +111,6 @@ bool QservRequest::ProcessResponse(const XrdSsiErrInfo& eInfo, const XrdSsiRespI // WARNING: This has to be the last call as the object may get deleted // downstream. onError(errorStr); - return false; } LOGS(_log, LOG_LVL_DEBUG, @@ -105,6 +126,9 @@ bool QservRequest::ProcessResponse(const XrdSsiErrInfo& eInfo, const XrdSsiRespI return true; default: + // This will decrement the reference counter to the pointee at the end of the current + // block regardless of any exceptions that may be thrown below. + auto self = move(_refToSelf4keepAlive); // Copy the argument before sending the upstream notification // Otherwise the current object may get disposed before we even had @@ -128,6 +152,10 @@ void QservRequest::ProcessResponseData(const XrdSsiErrInfo& eInfo, char* buff, i LOGS(_log, LOG_LVL_DEBUG, context << "eInfo.isOK: " << eInfo.isOK()); if (not eInfo.isOK()) { + // This will decrement the reference counter to the pointee at the end of the current + // block regardless of any exceptions that may be thrown below. + auto self = move(_refToSelf4keepAlive); + // Copy these arguments before sending the upstream notification. // Otherwise the current object may get disposed before we even had // a chance to notify XRootD/SSI by calling Finished(). @@ -153,6 +181,10 @@ void QservRequest::ProcessResponseData(const XrdSsiErrInfo& eInfo, char* buff, i _bufSize += blen; if (last) { + // This will decrement the reference counter to the pointee at the end of the current + // block regardless of any exceptions that may be thrown below. + auto self = move(_refToSelf4keepAlive); + // Tell XrootD to release all resources associated with this request Finished(); diff --git a/src/wpublish/QservRequest.h b/src/wpublish/QservRequest.h index e46d5b4f41..835cdab3f9 100644 --- a/src/wpublish/QservRequest.h +++ b/src/wpublish/QservRequest.h @@ -25,6 +25,8 @@ // System headers #include +#include +#include // Third party headers #include "XrdSsi/XrdSsiRequest.hh" @@ -43,12 +45,29 @@ class QservRequest : public XrdSsiRequest { public: QservRequest(QservRequest const&) = delete; QservRequest& operator=(QservRequest const&) = delete; - ~QservRequest() override; + /** + * Do a proper request cancellation to ensure a pointer to the request gets deleted + * after calling XrdSsiRequest::Finished(true). + */ + void cancel(); + protected: QservRequest(); + /** + * Setting a pointer to the object would guarantee that the life expectancy + * of the request be preserved before it's finished/failed and the corresponding + * notifications are sent to a subclass via the virtual methods QservRequest::onResponse() + * or QservRequest::onError(). The pointer will be reset after calling either of + * these methods, or the method QservRequest::cancel(). + * @param ptr The pointer to be set. + * @throws std::invalid_argument if the pointer is empty or pointing to a different + * request object. + */ + void setRefToSelf4keepAlive(std::shared_ptr ptr); + /** * Serialize a request into the provided buffer. The method is required to be * provided by a subclass. @@ -90,6 +109,10 @@ class QservRequest : public XrdSsiRequest { int _bufCapacity; ///< total capacity of the incoming buffer char* _buf; ///< buffer for incomming data + + /// The reference to the object is needed to guarantee the life expectency of + /// the request object while the request is still being processed. + std::shared_ptr _refToSelf4keepAlive; }; } // namespace lsst::qserv::wpublish diff --git a/src/wpublish/SetChunkListQservRequest.cc b/src/wpublish/SetChunkListQservRequest.cc index da8b4a90e8..12806701d4 100644 --- a/src/wpublish/SetChunkListQservRequest.cc +++ b/src/wpublish/SetChunkListQservRequest.cc @@ -72,7 +72,9 @@ string SetChunkListQservRequest::status2str(Status status) { SetChunkListQservRequest::Ptr SetChunkListQservRequest::create( SetChunkListQservRequest::ChunkCollection const& chunks, vector const& databases, bool force, SetChunkListQservRequest::CallbackType onFinish) { - return SetChunkListQservRequest::Ptr(new SetChunkListQservRequest(chunks, databases, force, onFinish)); + SetChunkListQservRequest::Ptr ptr(new SetChunkListQservRequest(chunks, databases, force, onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; } SetChunkListQservRequest::SetChunkListQservRequest(SetChunkListQservRequest::ChunkCollection const& chunks, diff --git a/src/wpublish/TestEchoQservRequest.cc b/src/wpublish/TestEchoQservRequest.cc index c0b2f4c85d..d6df7166ff 100644 --- a/src/wpublish/TestEchoQservRequest.cc +++ b/src/wpublish/TestEchoQservRequest.cc @@ -65,7 +65,9 @@ string TestEchoQservRequest::status2str(Status status) { TestEchoQservRequest::Ptr TestEchoQservRequest::create(string const& value, TestEchoQservRequest::CallbackType onFinish) { - return TestEchoQservRequest::Ptr(new TestEchoQservRequest(value, onFinish)); + TestEchoQservRequest::Ptr ptr(new TestEchoQservRequest(value, onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; } TestEchoQservRequest::TestEchoQservRequest(string const& value, TestEchoQservRequest::CallbackType onFinish) diff --git a/src/wpublish/qserv-worker-perf.cc b/src/wpublish/qserv-worker-perf.cc index 89151615b8..3a17fa6cf2 100644 --- a/src/wpublish/qserv-worker-perf.cc +++ b/src/wpublish/qserv-worker-perf.cc @@ -126,8 +126,7 @@ int test() { util::BlockPost blockPost(cancelAfterMs, cancelAfterMs + 1); blockPost.wait(); for (auto&& request : requests) { - bool const cancel = true; - request->Finished(cancel); + request->cancel(); } } return 0; diff --git a/src/wpublish/qserv-worker-status.cc b/src/wpublish/qserv-worker-status.cc index 1d54c47878..9ce7c580f3 100644 --- a/src/wpublish/qserv-worker-status.cc +++ b/src/wpublish/qserv-worker-status.cc @@ -126,8 +126,7 @@ int test() { util::BlockPost blockPost(cancelAfterMs, cancelAfterMs + 1); blockPost.wait(); for (auto&& request : requests) { - bool const cancel = true; - request->Finished(cancel); + request->cancel(); } } return 0; From 076f6e44e266cf8e5836b04b9db2a5d64b62831f Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 8 Jun 2023 14:54:12 -0700 Subject: [PATCH 5/8] Extended Czar-workers protocol (Protobuf) for query management Added the corresponidng requests handler at workers. Added a new transient class representing the API for sending query management requests. Added a command-line tool to test requests. --- src/proto/worker.proto | 29 +++++ src/wpublish/CMakeLists.txt | 30 +++++ src/wpublish/ChunkInventory.cc | 2 + src/wpublish/QueryManagementAction.cc | 138 ++++++++++++++++++++++ src/wpublish/QueryManagementAction.h | 96 ++++++++++++++++ src/wpublish/QueryManagementRequest.cc | 106 +++++++++++++++++ src/wpublish/QueryManagementRequest.h | 101 +++++++++++++++++ src/wpublish/qserv-query-management.cc | 151 +++++++++++++++++++++++++ src/xrdsvc/SsiProvider.cc | 3 + src/xrdsvc/SsiRequest.cc | 23 ++++ 10 files changed, 679 insertions(+) create mode 100644 src/wpublish/QueryManagementAction.cc create mode 100644 src/wpublish/QueryManagementAction.h create mode 100644 src/wpublish/QueryManagementRequest.cc create mode 100644 src/wpublish/QueryManagementRequest.h create mode 100644 src/wpublish/qserv-query-management.cc diff --git a/src/proto/worker.proto b/src/proto/worker.proto index 14fa25100e..e4b6fd7737 100644 --- a/src/proto/worker.proto +++ b/src/proto/worker.proto @@ -352,3 +352,32 @@ message WorkerCommandGetStatusR { // Status info serialized from a JSON object required string info = 1; } + + +///////////////////////////////////////////////////////////////// +// Protocol definition for the query management requests. These +// requests do not require any response messages to be explicitly +// sent by workers. +// +// ATTENTION: each message sent to a worker must be preceeded by +// an int32 size (network-byte-ordered) word carrying a size +// of the message. +//////////////////////////////////////////////////////////////// + +message QueryManagement { + + // Supported operations + enum Operation { + + // Cancel older queries before the specified query (excluding that one). + CANCEL_AFTER_RESTART = 1; + + // Cancel a specific query. + CANCEL = 2; + + // Notify workers on the completion of the specified query. + COMPLETE = 3; + } + required Operation op = 1; + required uint64 query_id = 2; +} diff --git a/src/wpublish/CMakeLists.txt b/src/wpublish/CMakeLists.txt index 18d5f9f642..3d76fd102c 100644 --- a/src/wpublish/CMakeLists.txt +++ b/src/wpublish/CMakeLists.txt @@ -13,6 +13,8 @@ target_sources(wpublish PRIVATE GetStatusQservRequest.cc QservRequest.cc QueriesAndChunks.cc + QueryManagementAction.cc + QueryManagementRequest.cc RemoveChunkGroupCommand.cc ResourceMonitor.cc SetChunkListCommand.cc @@ -27,7 +29,10 @@ target_include_directories(wpublish PRIVATE target_link_libraries(wpublish PUBLIC log + proto + protobuf XrdSsiLib + XrdCl ) add_executable(testChunkInventory testChunkInventory.cc) @@ -39,3 +44,28 @@ target_link_libraries(testChunkInventory PUBLIC ) add_test(NAME testChunkInventory COMMAND testChunkInventory) + + +FUNCTION(WPUBLISH_UTILS) + FOREACH(UTIL IN ITEMS ${ARGV}) + add_executable(${UTIL}) + target_sources(${UTIL} PRIVATE ${UTIL}.cc) + target_include_directories(${UTIL} PRIVATE ${XROOTD_INCLUDE_DIRS}) + target_link_libraries(${UTIL} PRIVATE + crypto + pthread + util + wpublish + xrdsvc + ) + install(TARGETS ${UTIL}) + ENDFOREACH() +ENDFUNCTION() + +wpublish_utils( + qserv-query-management +) + +install( + TARGETS wpublish +) diff --git a/src/wpublish/ChunkInventory.cc b/src/wpublish/ChunkInventory.cc index 80312c17ab..436f8606a9 100644 --- a/src/wpublish/ChunkInventory.cc +++ b/src/wpublish/ChunkInventory.cc @@ -142,6 +142,8 @@ class Validator : public lsst::qserv::ResourceUnit::Checker { return chunkInventory.has(ru.db(), ru.chunk()); case lsst::qserv::ResourceUnit::WORKER: return chunkInventory.id() == ru.workerId(); + case lsst::qserv::ResourceUnit::QUERY: + return true; default: return false; } diff --git a/src/wpublish/QueryManagementAction.cc b/src/wpublish/QueryManagementAction.cc new file mode 100644 index 0000000000..5533430b68 --- /dev/null +++ b/src/wpublish/QueryManagementAction.cc @@ -0,0 +1,138 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "wpublish/QueryManagementAction.h" + +// System headers +#include + +// Third party headers +#include "XrdCl/XrdClFile.hh" +#include "XrdCl/XrdClXRootDResponses.hh" +#include "XrdSsi/XrdSsiProvider.hh" +#include "XrdSsi/XrdSsiService.hh" + +// Qserv headers +#include "wpublish/QueryManagementRequest.h" + +// LSST headers +#include "lsst/log/Log.h" + +/// This C++ symbol is provided by the SSI shared library +extern XrdSsiProvider* XrdSsiProviderClient; + +using namespace std; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.QueryManagementAction"); + +string xrootdStatus2str(XrdCl::XRootDStatus const& s) { + return "status=" + to_string(s.status) + ", code=" + to_string(s.code) + ", errNo=" + to_string(s.errNo) + + ", message='" + s.GetErrorMessage() + "'"; +} + +/// The RAII wrapper around the silly C pointer to facilitate proper deletion +/// of the object returned by the XROOTD API. +struct LocationInfoRAII { + XrdCl::LocationInfo* locationInfo = nullptr; + ~LocationInfoRAII() { delete locationInfo; } +}; + +} // namespace + +namespace lsst::qserv::wpublish { + +void QueryManagementAction::notifyAllWorkers(string const& xrootdFrontendUrl, + proto::QueryManagement::Operation op, QueryId queryId, + CallbackType onFinish) { + auto const ptr = shared_ptr(new QueryManagementAction()); + ptr->_notifyAllWorkers(xrootdFrontendUrl, op, queryId, onFinish); +} + +QueryManagementAction::QueryManagementAction() { + LOGS(_log, LOG_LVL_DEBUG, "QueryManagementAction ** CONSTRUCTED **"); +} + +QueryManagementAction::~QueryManagementAction() { + LOGS(_log, LOG_LVL_DEBUG, "QueryManagementAction ** DELETED **"); +} + +void QueryManagementAction::_notifyAllWorkers(std::string const& xrootdFrontendUrl, + proto::QueryManagement::Operation op, QueryId queryId, + CallbackType onFinish) { + string const context = "QueryManagementAction::" + string(__func__) + " "; + + // Find all subscribers (worker XROOTD servers) serving this special resource. + // Throw an exception if no workers are registered. + ::LocationInfoRAII locationInfoHandler; + string const queryResourceName = "/query"; + XrdCl::FileSystem fileSystem(xrootdFrontendUrl); + XrdCl::XRootDStatus const status = fileSystem.Locate(queryResourceName, XrdCl::OpenFlags::Flags::None, + locationInfoHandler.locationInfo); + if (!status.IsOK()) { + throw runtime_error(context + "failed to locate subscribers for resource " + queryResourceName + + ", " + ::xrootdStatus2str(status)); + } + if (uint32_t const numLocations = locationInfoHandler.locationInfo->GetSize(); numLocations == 0) { + throw runtime_error(context + "no subscribers are serving resource " + queryResourceName); + } else { + // Fill worker addresses as keys into the response object. + for (uint32_t i = 0; i < numLocations; ++i) { + _response[locationInfoHandler.locationInfo->At(i).GetAddress()] = string(); + } + } + + // Send a request to each worker. Note capturing a copy of 'self' to ensure + // the curent object will still existr while the requests will be being processed. + auto const self = shared_from_this(); + for (auto itr : _response) { + string const workerAddress = itr.first; + + // Connect to the worker service + XrdSsiErrInfo errInfo; + XrdSsiService* serviceProvider = XrdSsiProviderClient->GetService(errInfo, workerAddress); + if (nullptr == serviceProvider) { + throw runtime_error(context + " failed to contact worker service " + workerAddress + + ", error: " + errInfo.Get()); + } + + // Make and configure the request object + auto request = wpublish::QueryManagementRequest::create( + op, queryId, + [self, workerAddress, onFinish](wpublish::QueryManagementRequest::Status status, + string const& error) { + if (status != wpublish::QueryManagementRequest::Status::SUCCESS) { + self->_response[workerAddress] = error; + } + if (++(self->_numWorkerRequestsFinished) == self->_response.size()) { + if (onFinish != nullptr) onFinish(self->_response); + } + }); + + // Initiate request processing + XrdSsiResource resource(queryResourceName); + serviceProvider->ProcessRequest(*request, resource); + } +} + +} // namespace lsst::qserv::wpublish diff --git a/src/wpublish/QueryManagementAction.h b/src/wpublish/QueryManagementAction.h new file mode 100644 index 0000000000..78f79e3250 --- /dev/null +++ b/src/wpublish/QueryManagementAction.h @@ -0,0 +1,96 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_ACTION_H +#define LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_ACTION_H + +// System headers +#include +#include +#include +#include +#include + +// Qserv headers +#include "global/intTypes.h" +#include "proto/worker.pb.h" + +namespace lsst::qserv::wpublish { + +/** + * Class QueryManagementAction is an interface for managing query completion/cancellation + * at all Qserv workers that are connected as "publishers" to the XROOTD redirector. + */ +class QueryManagementAction : public std::enable_shared_from_this { +public: + /// The reponse type represents errors reported by the workers, where worker + /// names are the keys. And the values are the error messages. Empty strings + /// indicate the succesful completion of the requests. + using Response = std::map; + + /// The callback function type to be used for notifications on the operation completion. + using CallbackType = std::function; + + /** + * The front-end method for initiating the operation at all workers. + * + * @note The only way to track the completion of the requests sent via + * this interface is by providing the callback function. The request delivery + * is not guaranteeded in case if the XROOTD/SSI network will be clogged by + * the heavy traffic. It's safe to call the same operation many times if needed. + * + * @param xrootdFrontendUrl A location of the XROOTD redirector. + * @param op An operation be initiated at the workers. + * @param onFinish The optional callback to be fired upon the completion of + * the requested operation. + * + * @throws std::runtime_error For failures encountered when connecting to + * the manager or initiating the requesed operation. + */ + static void notifyAllWorkers(std::string const& xrootdFrontendUrl, proto::QueryManagement::Operation op, + QueryId queryId, CallbackType onFinish = nullptr); + + QueryManagementAction(QueryManagementAction const&) = delete; + QueryManagementAction& operator=(QueryManagementAction const&) = delete; + virtual ~QueryManagementAction(); + +private: + QueryManagementAction(); + + /** + * The actual implementation of the request processor. + * @see QueryManagementAction::notifyAllWorkers() + */ + void _notifyAllWorkers(std::string const& xrootdFrontendUrl, proto::QueryManagement::Operation op, + QueryId queryId, CallbackType onFinish); + + /// The collection of worker responses. + Response _response; + + /// The counter will get incremented as worker responses will be received. + /// User-provided callback function (if any) will be called when all requests + /// will finish (succeed or fail). + std::atomic _numWorkerRequestsFinished{0}; +}; + +} // namespace lsst::qserv::wpublish + +#endif // LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_ACTION_H diff --git a/src/wpublish/QueryManagementRequest.cc b/src/wpublish/QueryManagementRequest.cc new file mode 100644 index 0000000000..c352c01c16 --- /dev/null +++ b/src/wpublish/QueryManagementRequest.cc @@ -0,0 +1,106 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "wpublish/QueryManagementRequest.h" + +// System headers +#include + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.QueryManagementRequest"); +} // namespace + +namespace lsst::qserv::wpublish { + +string QueryManagementRequest::status2str(Status status) { + switch (status) { + case SUCCESS: + return "SUCCESS"; + case ERROR: + return "ERROR"; + } + throw domain_error("QueryManagementRequest::" + string(__func__) + + " no match for status: " + to_string(status)); +} + +QueryManagementRequest::Ptr QueryManagementRequest::create(proto::QueryManagement::Operation op, + QueryId queryId, + QueryManagementRequest::CallbackType onFinish) { + QueryManagementRequest::Ptr ptr(new QueryManagementRequest(op, queryId, onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; +} + +QueryManagementRequest::QueryManagementRequest(proto::QueryManagement::Operation op, QueryId queryId, + QueryManagementRequest::CallbackType onFinish) + : _op(op), _queryId(queryId), _onFinish(onFinish) { + LOGS(_log, LOG_LVL_DEBUG, "QueryManagementRequest ** CONSTRUCTED **"); +} + +QueryManagementRequest::~QueryManagementRequest() { + LOGS(_log, LOG_LVL_DEBUG, "QueryManagementRequest ** DELETED **"); +} + +void QueryManagementRequest::onRequest(proto::FrameBuffer& buf) { + proto::QueryManagement message; + message.set_op(_op); + message.set_query_id(_queryId); + buf.serialize(message); +} + +void QueryManagementRequest::onResponse(proto::FrameBufferView& view) { + if (nullptr != _onFinish) { + // Clearing the stored callback after finishing the up-stream notification + // has two purposes: + // + // 1. it guaranties (exactly) one time notification + // 2. it breaks the up-stream dependency on a caller object if a shared + // pointer to the object was mentioned as the lambda-function's closure + + auto onFinish = move(_onFinish); + _onFinish = nullptr; + onFinish(Status::SUCCESS, string()); + } +} + +void QueryManagementRequest::onError(string const& error) { + if (nullptr != _onFinish) { + // Clearing the stored callback after finishing the up-stream notification + // has two purposes: + // + // 1. it guaranties (exactly) one time notification + // 2. it breaks the up-stream dependency on a caller object if a shared + // pointer to the object was mentioned as the lambda-function's closure + + auto onFinish = move(_onFinish); + _onFinish = nullptr; + onFinish(Status::ERROR, error); + } +} + +} // namespace lsst::qserv::wpublish diff --git a/src/wpublish/QueryManagementRequest.h b/src/wpublish/QueryManagementRequest.h new file mode 100644 index 0000000000..50287c79a9 --- /dev/null +++ b/src/wpublish/QueryManagementRequest.h @@ -0,0 +1,101 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_REQUEST_H +#define LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_REQUEST_H + +// System headers +#include +#include +#include + +// Qserv headers +#include "global/intTypes.h" +#include "proto/worker.pb.h" +#include "wpublish/QservRequest.h" + +namespace lsst::qserv::wpublish { + +/** + * Class QueryManagementRequest represents requests for managing query + * completion/cancellation at Qserv workers. + * @note No actuall responses are expected from these requests beyond + * the error messages in case of any problems in delivering or processing + * notifications. + */ +class QueryManagementRequest : public QservRequest { +public: + /// Completion status of the operation + enum Status { + SUCCESS, // successful completion of a request + ERROR // an error occurred during command execution + }; + + /// @return string representation of a status + static std::string status2str(Status status); + + /// The pointer type for instances of the class + typedef std::shared_ptr Ptr; + + /// The callback function type to be used for notifications on + /// the operation completion. + using CallbackType = std::function; // error message + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param op An operation to be initiated. + * @param queryId An uinque identifier of a query affected by the request. + * Note that a cole of the identifier depends on which operation + * was requested. + * @param onFinish (optional) callback function to be called upon the completion + * (successful or not) of the request. + * @return the smart pointer to the object of the class + */ + static Ptr create(proto::QueryManagement::Operation op, QueryId queryId, CallbackType onFinish = nullptr); + + QueryManagementRequest() = delete; + QueryManagementRequest(QueryManagementRequest const&) = delete; + QueryManagementRequest& operator=(QueryManagementRequest const&) = delete; + + virtual ~QueryManagementRequest() override; + +protected: + /// @see QueryManagementRequest::create() + QueryManagementRequest(proto::QueryManagement::Operation op, QueryId queryId, CallbackType onFinish); + + virtual void onRequest(proto::FrameBuffer& buf) override; + virtual void onResponse(proto::FrameBufferView& view) override; + virtual void onError(std::string const& error) override; + +private: + // Parameters of the object + + proto::QueryManagement::Operation _op = proto::QueryManagement::CANCEL_AFTER_RESTART; + QueryId _queryId = 0; + CallbackType _onFinish; +}; + +} // namespace lsst::qserv::wpublish + +#endif // LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_REQUEST_H diff --git a/src/wpublish/qserv-query-management.cc b/src/wpublish/qserv-query-management.cc new file mode 100644 index 0000000000..d50d0f81ee --- /dev/null +++ b/src/wpublish/qserv-query-management.cc @@ -0,0 +1,151 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +// System header +#include +#include +#include +#include +#include + +// Third party headers +#include "XrdSsi/XrdSsiProvider.hh" +#include "XrdSsi/XrdSsiService.hh" + +// Qserv headers +#include "global/intTypes.h" +#include "proto/worker.pb.h" +#include "util/BlockPost.h" +#include "util/CmdLineParser.h" +#include "wpublish/QueryManagementAction.h" +#include "wpublish/QueryManagementRequest.h" + +/// This C++ symbol is provided by the SSI shared library +extern XrdSsiProvider* XrdSsiProviderClient; + +namespace global = lsst::qserv; +namespace proto = lsst::qserv::proto; +namespace util = lsst::qserv::util; +namespace wpublish = lsst::qserv::wpublish; + +using namespace std; + +namespace { + +// Command line parameters + +vector const allowedOperations = {"CANCEL_AFTER_RESTART", "CANCEL", "COMPLETE"}; +proto::QueryManagement::Operation operation = proto::QueryManagement::CANCEL_AFTER_RESTART; +global::QueryId queryId; +bool allWorkers = false; +string serviceProviderLocation; + +proto::QueryManagement::Operation str2operation(string const& str) { + if (str == "CANCEL_AFTER_RESTART") { + return proto::QueryManagement::CANCEL_AFTER_RESTART; + } else if (str == "CANCEL") { + return proto::QueryManagement::CANCEL; + } else if (str == "COMPLETE") { + return proto::QueryManagement::COMPLETE; + } + throw invalid_argument("error: unknown operation '" + str + "'"); +} + +int test() { + bool finished = false; + if (allWorkers) { + wpublish::QueryManagementAction::notifyAllWorkers( + serviceProviderLocation, operation, queryId, + [&finished](wpublish::QueryManagementAction::Response const& response) { + for (auto itr : response) { + cout << "worker: " << itr.first << " error: " << itr.second << endl; + } + finished = true; + }); + } else { + // Connect to a service provider + XrdSsiErrInfo errInfo; + auto serviceProvider = XrdSsiProviderClient->GetService(errInfo, serviceProviderLocation); + if (nullptr == serviceProvider) { + cerr << "failed to contact service provider at: " << serviceProviderLocation + << ", error: " << errInfo.Get() << endl; + return 1; + } + cout << "connected to service provider at: " << serviceProviderLocation << endl; + + // Prepare the request + auto request = wpublish::QueryManagementRequest::create( + operation, queryId, + [&finished](wpublish::QueryManagementRequest::Status status, string const& error) { + cout << "status=" << wpublish::QueryManagementRequest::status2str(status) << ", error='" + << error << "'" << endl; + finished = true; + }); + + // Submit the request + XrdSsiResource resource("/query"); + serviceProvider->ProcessRequest(*request, resource); + } + + // Wait before the request will finish or fail + util::BlockPost blockPost(1000, 2000); + while (!finished) { + blockPost.wait(200); + } + return 0; +} +} // namespace + +int main(int argc, const char* const argv[]) { + // Verify that the version of the library that we linked against is + // compatible with the version of the headers we compiled against. + + GOOGLE_PROTOBUF_VERIFY_VERSION; + + // Parse command line parameters + try { + util::CmdLineParser parser( + argc, argv, + "\n" + "Usage:\n" + " \n" + " [--service=]\n" + "\n" + "Flags an options:\n" + " --all-workers - The flag indicating if the operation had to involve all workers.\n" + " --service= - A location of the service provider (default: 'localhost:1094').\n" + "\n" + "Parameters:\n" + " - An operation over the query (queries). Allowed values of\n" + " the parameter are: CANCEL_AFTER_RESTART, CANCEL, COMPLETE.\n" + " - User query identifier.\n"); + + ::operation = ::str2operation(parser.parameterRestrictedBy(1, ::allowedOperations)); + ::queryId = parser.parameter(2); + ::allWorkers = parser.flag("all-workers"); + ::serviceProviderLocation = parser.option("service", "localhost:1094"); + + } catch (exception const& ex) { + cerr << ex.what() << endl; + return 1; + } + return ::test(); +} diff --git a/src/xrdsvc/SsiProvider.cc b/src/xrdsvc/SsiProvider.cc index 62336749e3..ae490fcac2 100644 --- a/src/xrdsvc/SsiProvider.cc +++ b/src/xrdsvc/SsiProvider.cc @@ -171,6 +171,9 @@ XrdSsiProvider::rStat SsiProviderServer::QueryResource(char const* rName, char c // Tell the caller we don't recognize this worker LOGS(_log, LOG_LVL_DEBUG, "SsiProvider Query " << rName << " absent"); return notPresent; + + } else if (ru.unitType() == ResourceUnit::QUERY) { + return isPresent; } LOGS(_log, LOG_LVL_DEBUG, "SsiProvider Query " << rName << " invalid"); diff --git a/src/xrdsvc/SsiRequest.cc b/src/xrdsvc/SsiRequest.cc index 2e31ea055a..641897d6ad 100644 --- a/src/xrdsvc/SsiRequest.cc +++ b/src/xrdsvc/SsiRequest.cc @@ -205,6 +205,29 @@ void SsiRequest::execute(XrdSsiRequest& req) { } break; } + case ResourceUnit::QUERY: { + LOGS(_log, LOG_LVL_DEBUG, "Parsing request details for resource=" << _resourceName); + proto::QueryManagement request; + try { + // reqData has the entire request, so we can unpack it without waiting for + // more data. + proto::FrameBufferView view(reqData, reqSize); + view.parse(request); + ReleaseRequestBuffer(); + } catch (proto::FrameBufferError const& ex) { + reportError("Failed to decode a query completion/cancellation command, error: " + + std::string(ex.what())); + break; + } + LOGS(_log, LOG_LVL_DEBUG, + "QueryManagement: op=" << proto::QueryManagement_Operation_Name(request.op()) + << " query_id=" << request.query_id()); + + // Send back the empty response since no info is expected by a caller + // for this type of requests beyond the usual error notifications (if any). + this->reply((char const*)0, 0); + break; + } default: reportError("Unexpected unit type '" + std::to_string(ru.unitType()) + "', resource name: " + _resourceName); From 19d6029f5dfe27dd5535ddb481af5a3b0693df59 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Tue, 13 Jun 2023 14:51:58 -0700 Subject: [PATCH 6/8] Moved the SSI request classes into a dedicated module xrdreq --- src/CMakeLists.txt | 2 + src/ccontrol/CMakeLists.txt | 1 + src/qana/CMakeLists.txt | 1 + src/qdisp/CMakeLists.txt | 1 + src/qproc/CMakeLists.txt | 1 + src/query/CMakeLists.txt | 1 + src/replica/AddReplicaQservMgtRequest.cc | 14 +-- src/replica/AddReplicaQservMgtRequest.h | 4 +- src/replica/CMakeLists.txt | 5 +- src/replica/GetReplicasQservMgtRequest.cc | 14 +-- src/replica/GetReplicasQservMgtRequest.h | 6 +- src/replica/GetStatusQservMgtRequest.cc | 12 +-- src/replica/GetStatusQservMgtRequest.h | 4 +- src/replica/RemoveReplicaQservMgtRequest.cc | 14 +-- src/replica/RemoveReplicaQservMgtRequest.h | 4 +- src/replica/SetReplicasQservMgtRequest.cc | 22 ++--- src/replica/SetReplicasQservMgtRequest.h | 6 +- src/replica/TestEchoQservMgtRequest.cc | 10 +- src/replica/TestEchoQservMgtRequest.h | 4 +- src/rproc/CMakeLists.txt | 1 + src/wpublish/CMakeLists.txt | 30 ------ src/xrdreq/CMakeLists.txt | 52 ++++++++++ .../ChunkGroupQservRequest.cc | 18 ++-- .../ChunkGroupQservRequest.h | 12 +-- .../ChunkListQservRequest.cc | 14 +-- .../ChunkListQservRequest.h | 12 +-- .../GetChunkListQservRequest.cc | 14 +-- .../GetChunkListQservRequest.h | 12 +-- .../GetStatusQservRequest.cc | 8 +- .../GetStatusQservRequest.h | 12 +-- src/{wpublish => xrdreq}/QservRequest.cc | 8 +- src/{wpublish => xrdreq}/QservRequest.h | 10 +- .../QueryManagementAction.cc | 16 +-- .../QueryManagementAction.h | 10 +- .../QueryManagementRequest.cc | 8 +- .../QueryManagementRequest.h | 12 +-- .../SetChunkListQservRequest.cc | 18 ++-- .../SetChunkListQservRequest.h | 12 +-- .../TestEchoQservRequest.cc | 14 +-- .../TestEchoQservRequest.h | 12 +-- .../qserv-query-management.cc | 16 +-- .../qserv-worker-notify.cc | 99 +++++++++---------- .../qserv-worker-perf-chunks.cc | 6 +- src/{wpublish => xrdreq}/qserv-worker-perf.cc | 24 +++-- .../qserv-worker-status.cc | 22 ++--- 45 files changed, 314 insertions(+), 284 deletions(-) create mode 100644 src/xrdreq/CMakeLists.txt rename src/{wpublish => xrdreq}/ChunkGroupQservRequest.cc (91%) rename src/{wpublish => xrdreq}/ChunkGroupQservRequest.h (95%) rename src/{wpublish => xrdreq}/ChunkListQservRequest.cc (93%) rename src/{wpublish => xrdreq}/ChunkListQservRequest.h (95%) rename src/{wpublish => xrdreq}/GetChunkListQservRequest.cc (91%) rename src/{wpublish => xrdreq}/GetChunkListQservRequest.h (92%) rename src/{wpublish => xrdreq}/GetStatusQservRequest.cc (95%) rename src/{wpublish => xrdreq}/GetStatusQservRequest.h (92%) rename src/{wpublish => xrdreq}/QservRequest.cc (97%) rename src/{wpublish => xrdreq}/QservRequest.h (95%) rename src/{wpublish => xrdreq}/QueryManagementAction.cc (91%) rename src/{wpublish => xrdreq}/QueryManagementAction.h (93%) rename src/{wpublish => xrdreq}/QueryManagementRequest.cc (94%) rename src/{wpublish => xrdreq}/QueryManagementRequest.h (92%) rename src/{wpublish => xrdreq}/SetChunkListQservRequest.cc (90%) rename src/{wpublish => xrdreq}/SetChunkListQservRequest.h (93%) rename src/{wpublish => xrdreq}/TestEchoQservRequest.cc (91%) rename src/{wpublish => xrdreq}/TestEchoQservRequest.h (92%) rename src/{wpublish => xrdreq}/qserv-query-management.cc (89%) rename src/{wpublish => xrdreq}/qserv-worker-notify.cc (73%) rename src/{wpublish => xrdreq}/qserv-worker-perf-chunks.cc (99%) rename src/{wpublish => xrdreq}/qserv-worker-perf.cc (85%) rename src/{wpublish => xrdreq}/qserv-worker-status.cc (87%) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 59ebe896c0..c4ce354f95 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -74,6 +74,7 @@ add_subdirectory(wpublish) add_subdirectory(wsched) add_subdirectory(www) add_subdirectory(xrdlog) +add_subdirectory(xrdreq) add_subdirectory(xrdsvc) #----------------------------------------------------------------------------- @@ -127,6 +128,7 @@ target_link_libraries(qserv_czar PUBLIC rproc qserv_css qserv_meta + xrdreq ) install( diff --git a/src/ccontrol/CMakeLists.txt b/src/ccontrol/CMakeLists.txt index 18805d2cdd..91803fca81 100644 --- a/src/ccontrol/CMakeLists.txt +++ b/src/ccontrol/CMakeLists.txt @@ -45,6 +45,7 @@ FUNCTION(ccontrol_tests) qserv_meta query rproc + xrdreq Boost::unit_test_framework Threads::Threads ) diff --git a/src/qana/CMakeLists.txt b/src/qana/CMakeLists.txt index 11945e532e..ccf5634e4c 100644 --- a/src/qana/CMakeLists.txt +++ b/src/qana/CMakeLists.txt @@ -35,6 +35,7 @@ FUNCTION(qana_tests) qserv_css qserv_meta rproc + xrdreq Boost::unit_test_framework Threads::Threads ) diff --git a/src/qdisp/CMakeLists.txt b/src/qdisp/CMakeLists.txt index f4c4f7e0e6..1f34bdd971 100644 --- a/src/qdisp/CMakeLists.txt +++ b/src/qdisp/CMakeLists.txt @@ -41,6 +41,7 @@ target_link_libraries(testQDisp qserv_meta query rproc + xrdreq Boost::unit_test_framework Threads::Threads ) diff --git a/src/qproc/CMakeLists.txt b/src/qproc/CMakeLists.txt index bf1aa04511..1a022694f6 100644 --- a/src/qproc/CMakeLists.txt +++ b/src/qproc/CMakeLists.txt @@ -30,6 +30,7 @@ FUNCTION(qproc_tests) qserv_css qserv_meta rproc + xrdreq Boost::unit_test_framework Threads::Threads ) diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index 28a5207e9f..90c351aad2 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -59,6 +59,7 @@ FUNCTION(query_tests) qserv_meta query rproc + xrdreq Boost::unit_test_framework Threads::Threads ) diff --git a/src/replica/AddReplicaQservMgtRequest.cc b/src/replica/AddReplicaQservMgtRequest.cc index a2f79d66b3..80458cba0d 100644 --- a/src/replica/AddReplicaQservMgtRequest.cc +++ b/src/replica/AddReplicaQservMgtRequest.cc @@ -73,9 +73,9 @@ list> AddReplicaQservMgtRequest::extendedPersistentState() void AddReplicaQservMgtRequest::startImpl(replica::Lock const& lock) { auto const request = shared_from_base(); - _qservRequest = wpublish::AddChunkGroupQservRequest::create( + _qservRequest = xrdreq::AddChunkGroupQservRequest::create( chunk(), databases(), - [request](wpublish::ChunkGroupQservRequest::Status status, string const& error) { + [request](xrdreq::ChunkGroupQservRequest::Status status, string const& error) { if (request->state() == State::FINISHED) return; replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]"); @@ -83,26 +83,26 @@ void AddReplicaQservMgtRequest::startImpl(replica::Lock const& lock) { if (request->state() == State::FINISHED) return; switch (status) { - case wpublish::ChunkGroupQservRequest::Status::SUCCESS: + case xrdreq::ChunkGroupQservRequest::Status::SUCCESS: request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS); break; - case wpublish::ChunkGroupQservRequest::Status::INVALID: + case xrdreq::ChunkGroupQservRequest::Status::INVALID: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD, error); break; - case wpublish::ChunkGroupQservRequest::Status::IN_USE: + case xrdreq::ChunkGroupQservRequest::Status::IN_USE: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_CHUNK_IN_USE, error); break; - case wpublish::ChunkGroupQservRequest::Status::ERROR: + case xrdreq::ChunkGroupQservRequest::Status::ERROR: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error); break; default: throw logic_error("AddReplicaQservMgtRequest::" + string(__func__) + " unhandled server status: " + - wpublish::ChunkGroupQservRequest::status2str(status)); + xrdreq::ChunkGroupQservRequest::status2str(status)); } }); XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker())); diff --git a/src/replica/AddReplicaQservMgtRequest.h b/src/replica/AddReplicaQservMgtRequest.h index da45ebc1de..ba1c3b5509 100644 --- a/src/replica/AddReplicaQservMgtRequest.h +++ b/src/replica/AddReplicaQservMgtRequest.h @@ -29,7 +29,7 @@ // Qserv headers #include "replica/QservMgtRequest.h" #include "replica/ServiceProvider.h" -#include "wpublish/ChunkGroupQservRequest.h" +#include "xrdreq/ChunkGroupQservRequest.h" // This header declarations @@ -101,7 +101,7 @@ class AddReplicaQservMgtRequest : public QservMgtRequest { CallbackType _onFinish; /// @note is reset when the request finishes /// A request to the remote services - wpublish::AddChunkGroupQservRequest::Ptr _qservRequest; + xrdreq::AddChunkGroupQservRequest::Ptr _qservRequest; }; } // namespace lsst::qserv::replica diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index ba4920e643..cd9163d188 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -463,6 +463,7 @@ target_include_directories(replica PRIVATE target_link_libraries(replica PUBLIC qserv_css + xrdreq xrdsvc XrdCl XrdSsiLib @@ -483,7 +484,9 @@ FUNCTION(REPLICA_UTILS) add_executable(${UTIL}) target_sources(${UTIL} PRIVATE tools/${UTIL}.cc) target_include_directories(${UTIL} PRIVATE ${XROOTD_INCLUDE_DIRS}) - target_link_libraries(${UTIL} PRIVATE replica) + target_link_libraries(${UTIL} PRIVATE + replica + ) install(TARGETS ${UTIL}) ENDFOREACH() ENDFUNCTION() diff --git a/src/replica/GetReplicasQservMgtRequest.cc b/src/replica/GetReplicasQservMgtRequest.cc index 36f230b0c1..6b10a39fab 100644 --- a/src/replica/GetReplicasQservMgtRequest.cc +++ b/src/replica/GetReplicasQservMgtRequest.cc @@ -80,7 +80,7 @@ list> GetReplicasQservMgtRequest::extendedPersistentState() } void GetReplicasQservMgtRequest::_setReplicas( - replica::Lock const& lock, wpublish::GetChunkListQservRequest::ChunkCollection const& collection) { + replica::Lock const& lock, xrdreq::GetChunkListQservRequest::ChunkCollection const& collection) { // Filter results by databases participating in the family set databases; @@ -111,9 +111,9 @@ void GetReplicasQservMgtRequest::startImpl(replica::Lock const& lock) { auto const request = shared_from_base(); - _qservRequest = wpublish::GetChunkListQservRequest::create( - inUseOnly(), [request](wpublish::GetChunkListQservRequest::Status status, string const& error, - wpublish::GetChunkListQservRequest::ChunkCollection const& collection) { + _qservRequest = xrdreq::GetChunkListQservRequest::create( + inUseOnly(), [request](xrdreq::GetChunkListQservRequest::Status status, string const& error, + xrdreq::GetChunkListQservRequest::ChunkCollection const& collection) { if (request->state() == State::FINISHED) return; replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]"); @@ -121,13 +121,13 @@ void GetReplicasQservMgtRequest::startImpl(replica::Lock const& lock) { if (request->state() == State::FINISHED) return; switch (status) { - case wpublish::GetChunkListQservRequest::Status::SUCCESS: + case xrdreq::GetChunkListQservRequest::Status::SUCCESS: request->_setReplicas(lock, collection); request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS); break; - case wpublish::GetChunkListQservRequest::Status::ERROR: + case xrdreq::GetChunkListQservRequest::Status::ERROR: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error); break; @@ -135,7 +135,7 @@ void GetReplicasQservMgtRequest::startImpl(replica::Lock const& lock) { default: throw logic_error("GetReplicasQservMgtRequest::" + string(__func__) + " unhandled server status: " + - wpublish::GetChunkListQservRequest::status2str(status)); + xrdreq::GetChunkListQservRequest::status2str(status)); } }); XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker())); diff --git a/src/replica/GetReplicasQservMgtRequest.h b/src/replica/GetReplicasQservMgtRequest.h index b8864e1719..508bf75cc7 100644 --- a/src/replica/GetReplicasQservMgtRequest.h +++ b/src/replica/GetReplicasQservMgtRequest.h @@ -30,7 +30,7 @@ #include "replica/QservMgtRequest.h" #include "replica/ReplicaInfo.h" #include "replica/ServiceProvider.h" -#include "wpublish/GetChunkListQservRequest.h" +#include "xrdreq/GetChunkListQservRequest.h" // This header declarations namespace lsst::qserv::replica { @@ -109,7 +109,7 @@ class GetReplicasQservMgtRequest : public QservMgtRequest { * @param collection The input collection of replicas. */ void _setReplicas(replica::Lock const& lock, - wpublish::GetChunkListQservRequest::ChunkCollection const& collection); + xrdreq::GetChunkListQservRequest::ChunkCollection const& collection); // Input parameters @@ -118,7 +118,7 @@ class GetReplicasQservMgtRequest : public QservMgtRequest { CallbackType _onFinish; /// @note is reset when the request finishes /// A request to the remote services - wpublish::GetChunkListQservRequest::Ptr _qservRequest; + xrdreq::GetChunkListQservRequest::Ptr _qservRequest; /// A collection of replicas reported by the Qserr worker QservReplicaCollection _replicas; diff --git a/src/replica/GetStatusQservMgtRequest.cc b/src/replica/GetStatusQservMgtRequest.cc index aa3483a580..ba43289a61 100644 --- a/src/replica/GetStatusQservMgtRequest.cc +++ b/src/replica/GetStatusQservMgtRequest.cc @@ -79,15 +79,15 @@ list> GetStatusQservMgtRequest::extendedPersistentState() c void GetStatusQservMgtRequest::startImpl(replica::Lock const& lock) { auto const request = shared_from_base(); - _qservRequest = wpublish::GetStatusQservRequest::create( - _taskSelector, [request](wpublish::GetStatusQservRequest::Status status, string const& error, - string const& info) { + _qservRequest = xrdreq::GetStatusQservRequest::create( + _taskSelector, + [request](xrdreq::GetStatusQservRequest::Status status, string const& error, string const& info) { if (request->state() == State::FINISHED) return; replica::Lock const lock(request->_mtx, request->context() + string(__func__) + "[callback]"); if (request->state() == State::FINISHED) return; switch (status) { - case wpublish::GetStatusQservRequest::Status::SUCCESS: + case xrdreq::GetStatusQservRequest::Status::SUCCESS: try { request->_setInfo(lock, info); request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS); @@ -98,13 +98,13 @@ void GetStatusQservMgtRequest::startImpl(replica::Lock const& lock) { request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD_RESPONSE, msg); } break; - case wpublish::GetStatusQservRequest::Status::ERROR: + case xrdreq::GetStatusQservRequest::Status::ERROR: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error); break; default: throw logic_error("GetStatusQservMgtRequest::" + string(__func__) + " unhandled server status: " + - wpublish::GetStatusQservRequest::status2str(status)); + xrdreq::GetStatusQservRequest::status2str(status)); } }); XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker())); diff --git a/src/replica/GetStatusQservMgtRequest.h b/src/replica/GetStatusQservMgtRequest.h index ca9a6e5e8b..4242906d53 100644 --- a/src/replica/GetStatusQservMgtRequest.h +++ b/src/replica/GetStatusQservMgtRequest.h @@ -33,7 +33,7 @@ #include "replica/QservMgtRequest.h" #include "replica/ServiceProvider.h" #include "wbase/TaskState.h" -#include "wpublish/GetStatusQservRequest.h" +#include "xrdreq/GetStatusQservRequest.h" // This header declarations namespace lsst::qserv::replica { @@ -109,7 +109,7 @@ class GetStatusQservMgtRequest : public QservMgtRequest { CallbackType _onFinish; ///< this object is reset after finishing the request /// A request to the remote services - wpublish::GetStatusQservRequest::Ptr _qservRequest; + xrdreq::GetStatusQservRequest::Ptr _qservRequest; /// The info object returned by the Qserv worker nlohmann::json _info; diff --git a/src/replica/RemoveReplicaQservMgtRequest.cc b/src/replica/RemoveReplicaQservMgtRequest.cc index 2d89b2f14e..83e4de4f30 100644 --- a/src/replica/RemoveReplicaQservMgtRequest.cc +++ b/src/replica/RemoveReplicaQservMgtRequest.cc @@ -76,9 +76,9 @@ list> RemoveReplicaQservMgtRequest::extendedPersistentState void RemoveReplicaQservMgtRequest::startImpl(replica::Lock const& lock) { auto const request = shared_from_base(); - _qservRequest = wpublish::RemoveChunkGroupQservRequest::create( + _qservRequest = xrdreq::RemoveChunkGroupQservRequest::create( chunk(), databases(), force(), - [request](wpublish::ChunkGroupQservRequest::Status status, string const& error) { + [request](xrdreq::ChunkGroupQservRequest::Status status, string const& error) { if (request->state() == State::FINISHED) return; replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]"); @@ -86,26 +86,26 @@ void RemoveReplicaQservMgtRequest::startImpl(replica::Lock const& lock) { if (request->state() == State::FINISHED) return; switch (status) { - case wpublish::ChunkGroupQservRequest::Status::SUCCESS: + case xrdreq::ChunkGroupQservRequest::Status::SUCCESS: request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS); break; - case wpublish::ChunkGroupQservRequest::Status::INVALID: + case xrdreq::ChunkGroupQservRequest::Status::INVALID: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD, error); break; - case wpublish::ChunkGroupQservRequest::Status::IN_USE: + case xrdreq::ChunkGroupQservRequest::Status::IN_USE: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_CHUNK_IN_USE, error); break; - case wpublish::ChunkGroupQservRequest::Status::ERROR: + case xrdreq::ChunkGroupQservRequest::Status::ERROR: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error); break; default: throw logic_error("RemoveReplicaQservMgtRequest::" + string(__func__) + " unhandled server status: " + - wpublish::ChunkGroupQservRequest::status2str(status)); + xrdreq::ChunkGroupQservRequest::status2str(status)); } }); XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker())); diff --git a/src/replica/RemoveReplicaQservMgtRequest.h b/src/replica/RemoveReplicaQservMgtRequest.h index d6a06c6518..77f4a379a4 100644 --- a/src/replica/RemoveReplicaQservMgtRequest.h +++ b/src/replica/RemoveReplicaQservMgtRequest.h @@ -29,7 +29,7 @@ // Qserv headers #include "replica/QservMgtRequest.h" #include "replica/ServiceProvider.h" -#include "wpublish/ChunkGroupQservRequest.h" +#include "xrdreq/ChunkGroupQservRequest.h" // This header declarations namespace lsst::qserv::replica { @@ -107,7 +107,7 @@ class RemoveReplicaQservMgtRequest : public QservMgtRequest { CallbackType _onFinish; /// A request to the remote services - wpublish::RemoveChunkGroupQservRequest::Ptr _qservRequest; + xrdreq::RemoveChunkGroupQservRequest::Ptr _qservRequest; }; } // namespace lsst::qserv::replica diff --git a/src/replica/SetReplicasQservMgtRequest.cc b/src/replica/SetReplicasQservMgtRequest.cc index e031ab67d3..f5badd1f95 100644 --- a/src/replica/SetReplicasQservMgtRequest.cc +++ b/src/replica/SetReplicasQservMgtRequest.cc @@ -88,7 +88,7 @@ list> SetReplicasQservMgtRequest::extendedPersistentState() } void SetReplicasQservMgtRequest::_setReplicas( - replica::Lock const& lock, wpublish::SetChunkListQservRequest::ChunkCollection const& collection) { + replica::Lock const& lock, xrdreq::SetChunkListQservRequest::ChunkCollection const& collection) { _replicas.clear(); for (auto&& replica : collection) { _replicas.push_back(QservReplica{replica.chunk, replica.database, replica.use_count}); @@ -96,18 +96,18 @@ void SetReplicasQservMgtRequest::_setReplicas( } void SetReplicasQservMgtRequest::startImpl(replica::Lock const& lock) { - wpublish::SetChunkListQservRequest::ChunkCollection chunks; + xrdreq::SetChunkListQservRequest::ChunkCollection chunks; for (auto&& chunkEntry : newReplicas()) { - chunks.push_back(wpublish::SetChunkListQservRequest::Chunk{ + chunks.push_back(xrdreq::SetChunkListQservRequest::Chunk{ chunkEntry.chunk, chunkEntry.database, 0 /* UNUSED: use_count */ }); } auto const request = shared_from_base(); - _qservRequest = wpublish::SetChunkListQservRequest::create( + _qservRequest = xrdreq::SetChunkListQservRequest::create( chunks, _databases, force(), - [request](wpublish::SetChunkListQservRequest::Status status, string const& error, - wpublish::SetChunkListQservRequest::ChunkCollection const& collection) { + [request](xrdreq::SetChunkListQservRequest::Status status, string const& error, + xrdreq::SetChunkListQservRequest::ChunkCollection const& collection) { if (request->state() == State::FINISHED) return; replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]"); @@ -115,27 +115,27 @@ void SetReplicasQservMgtRequest::startImpl(replica::Lock const& lock) { if (request->state() == State::FINISHED) return; switch (status) { - case wpublish::SetChunkListQservRequest::Status::SUCCESS: + case xrdreq::SetChunkListQservRequest::Status::SUCCESS: request->_setReplicas(lock, collection); request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS); break; - case wpublish::SetChunkListQservRequest::Status::ERROR: + case xrdreq::SetChunkListQservRequest::Status::ERROR: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error); break; - case wpublish::SetChunkListQservRequest::Status::INVALID: + case xrdreq::SetChunkListQservRequest::Status::INVALID: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD, error); break; - case wpublish::SetChunkListQservRequest::Status::IN_USE: + case xrdreq::SetChunkListQservRequest::Status::IN_USE: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_CHUNK_IN_USE, error); break; default: throw logic_error("SetReplicasQservMgtRequest:: " + string(__func__) + " unhandled server status: " + - wpublish::SetChunkListQservRequest::status2str(status)); + xrdreq::SetChunkListQservRequest::status2str(status)); } }); XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker())); diff --git a/src/replica/SetReplicasQservMgtRequest.h b/src/replica/SetReplicasQservMgtRequest.h index 49b9057b94..4da0d1d595 100644 --- a/src/replica/SetReplicasQservMgtRequest.h +++ b/src/replica/SetReplicasQservMgtRequest.h @@ -30,7 +30,7 @@ #include "replica/QservMgtRequest.h" #include "replica/ReplicaInfo.h" #include "replica/ServiceProvider.h" -#include "wpublish/SetChunkListQservRequest.h" +#include "xrdreq/SetChunkListQservRequest.h" // This header declarations namespace lsst::qserv::replica { @@ -109,7 +109,7 @@ class SetReplicasQservMgtRequest : public QservMgtRequest { * @param collection The input collection of replicas. */ void _setReplicas(replica::Lock const& lock, - wpublish::SetChunkListQservRequest::ChunkCollection const& collection); + xrdreq::SetChunkListQservRequest::ChunkCollection const& collection); // Input parameters. @@ -120,7 +120,7 @@ class SetReplicasQservMgtRequest : public QservMgtRequest { CallbackType _onFinish; /// A request to the remote services. - wpublish::SetChunkListQservRequest::Ptr _qservRequest; + xrdreq::SetChunkListQservRequest::Ptr _qservRequest; /// A collection of replicas reported by the Qserv worker. QservReplicaCollection _replicas; diff --git a/src/replica/TestEchoQservMgtRequest.cc b/src/replica/TestEchoQservMgtRequest.cc index 04f4c48aad..b84988de56 100644 --- a/src/replica/TestEchoQservMgtRequest.cc +++ b/src/replica/TestEchoQservMgtRequest.cc @@ -78,8 +78,8 @@ void TestEchoQservMgtRequest::startImpl(replica::Lock const& lock) { auto const request = shared_from_base(); - _qservRequest = wpublish::TestEchoQservRequest::create( - data(), [request](wpublish::TestEchoQservRequest::Status status, string const& error, + _qservRequest = xrdreq::TestEchoQservRequest::create( + data(), [request](xrdreq::TestEchoQservRequest::Status status, string const& error, string const& data, string const& dataEcho) { if (request->state() == State::FINISHED) return; @@ -88,13 +88,13 @@ void TestEchoQservMgtRequest::startImpl(replica::Lock const& lock) { if (request->state() == State::FINISHED) return; switch (status) { - case wpublish::TestEchoQservRequest::Status::SUCCESS: + case xrdreq::TestEchoQservRequest::Status::SUCCESS: request->_setData(lock, dataEcho); request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS); break; - case wpublish::TestEchoQservRequest::Status::ERROR: + case xrdreq::TestEchoQservRequest::Status::ERROR: request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error); break; @@ -102,7 +102,7 @@ void TestEchoQservMgtRequest::startImpl(replica::Lock const& lock) { default: throw logic_error("TestEchoQservMgtRequest::" + string(__func__) + " unhandled server status: " + - wpublish::TestEchoQservRequest::status2str(status)); + xrdreq::TestEchoQservRequest::status2str(status)); } }); XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker())); diff --git a/src/replica/TestEchoQservMgtRequest.h b/src/replica/TestEchoQservMgtRequest.h index 4bbdd6bae0..ba0113bcce 100644 --- a/src/replica/TestEchoQservMgtRequest.h +++ b/src/replica/TestEchoQservMgtRequest.h @@ -29,7 +29,7 @@ // Qserv headers #include "replica/QservMgtRequest.h" #include "replica/ServiceProvider.h" -#include "wpublish/TestEchoQservRequest.h" +#include "xrdreq/TestEchoQservRequest.h" // This header declarations namespace lsst::qserv::replica { @@ -107,7 +107,7 @@ class TestEchoQservMgtRequest : public QservMgtRequest { CallbackType _onFinish; ///< @note this object is reset after finishing the request /// A request to the remote services - wpublish::TestEchoQservRequest::Ptr _qservRequest; + xrdreq::TestEchoQservRequest::Ptr _qservRequest; /// The data string returned by the Qserv worker std::string _dataEcho; diff --git a/src/rproc/CMakeLists.txt b/src/rproc/CMakeLists.txt index 98c8e7a4b4..91910dd95e 100644 --- a/src/rproc/CMakeLists.txt +++ b/src/rproc/CMakeLists.txt @@ -28,6 +28,7 @@ FUNCTION(rproc_tests) qserv_css qserv_meta rproc + xrdreq Boost::unit_test_framework Threads::Threads ) diff --git a/src/wpublish/CMakeLists.txt b/src/wpublish/CMakeLists.txt index 3d76fd102c..985020656e 100644 --- a/src/wpublish/CMakeLists.txt +++ b/src/wpublish/CMakeLists.txt @@ -3,24 +3,15 @@ add_dependencies(wpublish proto) target_sources(wpublish PRIVATE AddChunkGroupCommand.cc - ChunkGroupQservRequest.cc ChunkInventory.cc ChunkListCommand.cc - ChunkListQservRequest.cc GetChunkListCommand.cc - GetChunkListQservRequest.cc GetStatusCommand.cc - GetStatusQservRequest.cc - QservRequest.cc QueriesAndChunks.cc - QueryManagementAction.cc - QueryManagementRequest.cc RemoveChunkGroupCommand.cc ResourceMonitor.cc SetChunkListCommand.cc - SetChunkListQservRequest.cc TestEchoCommand.cc - TestEchoQservRequest.cc ) target_include_directories(wpublish PRIVATE @@ -45,27 +36,6 @@ target_link_libraries(testChunkInventory PUBLIC add_test(NAME testChunkInventory COMMAND testChunkInventory) - -FUNCTION(WPUBLISH_UTILS) - FOREACH(UTIL IN ITEMS ${ARGV}) - add_executable(${UTIL}) - target_sources(${UTIL} PRIVATE ${UTIL}.cc) - target_include_directories(${UTIL} PRIVATE ${XROOTD_INCLUDE_DIRS}) - target_link_libraries(${UTIL} PRIVATE - crypto - pthread - util - wpublish - xrdsvc - ) - install(TARGETS ${UTIL}) - ENDFOREACH() -ENDFUNCTION() - -wpublish_utils( - qserv-query-management -) - install( TARGETS wpublish ) diff --git a/src/xrdreq/CMakeLists.txt b/src/xrdreq/CMakeLists.txt new file mode 100644 index 0000000000..6aa8cf95dc --- /dev/null +++ b/src/xrdreq/CMakeLists.txt @@ -0,0 +1,52 @@ +add_library(xrdreq OBJECT) +add_dependencies(xrdreq proto) + +target_sources(xrdreq PRIVATE + ChunkGroupQservRequest.cc + ChunkListQservRequest.cc + GetChunkListQservRequest.cc + GetStatusQservRequest.cc + QservRequest.cc + QueryManagementAction.cc + QueryManagementRequest.cc + SetChunkListQservRequest.cc + TestEchoQservRequest.cc +) + +target_include_directories(xrdreq PRIVATE + ${XROOTD_INCLUDE_DIRS} +) + +target_link_libraries(xrdreq PUBLIC + log + proto + protobuf + XrdSsiLib + XrdCl +) + +FUNCTION(XRDREQ_UTILS) + FOREACH(UTIL IN ITEMS ${ARGV}) + add_executable(${UTIL}) + target_sources(${UTIL} PRIVATE ${UTIL}.cc) + target_include_directories(${UTIL} PRIVATE ${XROOTD_INCLUDE_DIRS}) + target_link_libraries(${UTIL} PRIVATE + crypto + pthread + proto + util + global + xrdreq + ) + install(TARGETS ${UTIL}) + ENDFOREACH() +ENDFUNCTION() + +xrdreq_utils( + qserv-query-management + qserv-worker-perf +) + +install( + TARGETS xrdreq +) diff --git a/src/wpublish/ChunkGroupQservRequest.cc b/src/xrdreq/ChunkGroupQservRequest.cc similarity index 91% rename from src/wpublish/ChunkGroupQservRequest.cc rename to src/xrdreq/ChunkGroupQservRequest.cc index 0da4d7fd48..b5bd2f291d 100644 --- a/src/wpublish/ChunkGroupQservRequest.cc +++ b/src/xrdreq/ChunkGroupQservRequest.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/ChunkGroupQservRequest.h" +#include "xrdreq/ChunkGroupQservRequest.h" // System headers #include @@ -34,20 +34,20 @@ using namespace std; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.ChunkGroupQservRequest"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.ChunkGroupQservRequest"); using namespace lsst::qserv; -wpublish::ChunkGroupQservRequest::Status translate(proto::WorkerCommandChunkGroupR::Status status) { +xrdreq::ChunkGroupQservRequest::Status translate(proto::WorkerCommandChunkGroupR::Status status) { switch (status) { case proto::WorkerCommandChunkGroupR::SUCCESS: - return wpublish::ChunkGroupQservRequest::SUCCESS; + return xrdreq::ChunkGroupQservRequest::SUCCESS; case proto::WorkerCommandChunkGroupR::INVALID: - return wpublish::ChunkGroupQservRequest::INVALID; + return xrdreq::ChunkGroupQservRequest::INVALID; case proto::WorkerCommandChunkGroupR::IN_USE: - return wpublish::ChunkGroupQservRequest::IN_USE; + return xrdreq::ChunkGroupQservRequest::IN_USE; case proto::WorkerCommandChunkGroupR::ERROR: - return wpublish::ChunkGroupQservRequest::ERROR; + return xrdreq::ChunkGroupQservRequest::ERROR; } throw domain_error("ChunkGroupQservRequest::" + string(__func__) + " no match for Protobuf status: " + proto::WorkerCommandChunkGroupR_Status_Name(status)); @@ -55,7 +55,7 @@ wpublish::ChunkGroupQservRequest::Status translate(proto::WorkerCommandChunkGrou } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { string ChunkGroupQservRequest::status2str(Status status) { switch (status) { @@ -161,4 +161,4 @@ RemoveChunkGroupQservRequest::RemoveChunkGroupQservRequest(unsigned int chunk, CallbackType onFinish) : ChunkGroupQservRequest(false, chunk, databases, force, onFinish) {} -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/ChunkGroupQservRequest.h b/src/xrdreq/ChunkGroupQservRequest.h similarity index 95% rename from src/wpublish/ChunkGroupQservRequest.h rename to src/xrdreq/ChunkGroupQservRequest.h index 5792e8768b..2927ba3f71 100644 --- a/src/wpublish/ChunkGroupQservRequest.h +++ b/src/xrdreq/ChunkGroupQservRequest.h @@ -21,8 +21,8 @@ * see . */ /// ChunkGroupQservRequest.h -#ifndef LSST_QSERV_WPUBLISH_CHUNK_GROUP_QSERV_REQUEST_H -#define LSST_QSERV_WPUBLISH_CHUNK_GROUP_QSERV_REQUEST_H +#ifndef LSST_QSERV_XRDREQ_CHUNK_GROUP_QSERV_REQUEST_H +#define LSST_QSERV_XRDREQ_CHUNK_GROUP_QSERV_REQUEST_H // System headers #include @@ -30,10 +30,10 @@ #include // Qserv headers -#include "wpublish/QservRequest.h" +#include "xrdreq/QservRequest.h" // This header declarations -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class ChunkGroupQservRequest implements a client-side request to @@ -170,6 +170,6 @@ class RemoveChunkGroupQservRequest : public ChunkGroupQservRequest { CallbackType onFinish); }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_CHUNK_GROUP_QSERV_REQUEST_H +#endif // LSST_QSERV_XRDREQ_CHUNK_GROUP_QSERV_REQUEST_H diff --git a/src/wpublish/ChunkListQservRequest.cc b/src/xrdreq/ChunkListQservRequest.cc similarity index 93% rename from src/wpublish/ChunkListQservRequest.cc rename to src/xrdreq/ChunkListQservRequest.cc index b5c0016d17..16914cc567 100644 --- a/src/wpublish/ChunkListQservRequest.cc +++ b/src/xrdreq/ChunkListQservRequest.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/ChunkListQservRequest.h" +#include "xrdreq/ChunkListQservRequest.h" // System headers #include @@ -35,21 +35,21 @@ using namespace std; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.ChunkListQservRequest"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.ChunkListQservRequest"); -wpublish::ChunkListQservRequest::Status translate(proto::WorkerCommandUpdateChunkListR::Status status) { +xrdreq::ChunkListQservRequest::Status translate(proto::WorkerCommandUpdateChunkListR::Status status) { switch (status) { case proto::WorkerCommandUpdateChunkListR::SUCCESS: - return wpublish::ChunkListQservRequest::SUCCESS; + return xrdreq::ChunkListQservRequest::SUCCESS; case proto::WorkerCommandUpdateChunkListR::ERROR: - return wpublish::ChunkListQservRequest::ERROR; + return xrdreq::ChunkListQservRequest::ERROR; } throw domain_error("ChunkListQservRequest::translate no match for Protobuf status: " + proto::WorkerCommandUpdateChunkListR_Status_Name(status)); } } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { string ChunkListQservRequest::status2str(Status status) { switch (status) { @@ -161,4 +161,4 @@ RebuildChunkListQservRequest::RebuildChunkListQservRequest(bool reload, ChunkListQservRequest::CallbackType onFinish) : ChunkListQservRequest(true, reload, onFinish) {} -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/ChunkListQservRequest.h b/src/xrdreq/ChunkListQservRequest.h similarity index 95% rename from src/wpublish/ChunkListQservRequest.h rename to src/xrdreq/ChunkListQservRequest.h index b637c5d671..35f451b342 100644 --- a/src/wpublish/ChunkListQservRequest.h +++ b/src/xrdreq/ChunkListQservRequest.h @@ -20,8 +20,8 @@ * the GNU General Public License along with this program. If not, * see . */ -#ifndef LSST_QSERV_WPUBLISH_CHUNK_LIST_QSERV_REQUEST_H -#define LSST_QSERV_WPUBLISH_CHUNK_LIST_QSERV_REQUEST_H +#ifndef LSST_QSERV_XRDREQ_CHUNK_LIST_QSERV_REQUEST_H +#define LSST_QSERV_XRDREQ_CHUNK_LIST_QSERV_REQUEST_H // System headers #include @@ -29,9 +29,9 @@ #include // Qserv headers -#include "wpublish/QservRequest.h" +#include "xrdreq/QservRequest.h" -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class ChunkListQservRequest the base class for client-side requests @@ -165,6 +165,6 @@ class RebuildChunkListQservRequest : public ChunkListQservRequest { RebuildChunkListQservRequest(bool reload, CallbackType onFinish); }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_CHUNK_LIST_QSERV_REQUEST_H +#endif // LSST_QSERV_XRDREQ_CHUNK_LIST_QSERV_REQUEST_H diff --git a/src/wpublish/GetChunkListQservRequest.cc b/src/xrdreq/GetChunkListQservRequest.cc similarity index 91% rename from src/wpublish/GetChunkListQservRequest.cc rename to src/xrdreq/GetChunkListQservRequest.cc index c2b593a7df..fb46026f06 100644 --- a/src/wpublish/GetChunkListQservRequest.cc +++ b/src/xrdreq/GetChunkListQservRequest.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/GetChunkListQservRequest.h" +#include "xrdreq/GetChunkListQservRequest.h" // System headers #include @@ -35,21 +35,21 @@ using namespace std; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.GetChunkListQservRequest"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.GetChunkListQservRequest"); -wpublish::GetChunkListQservRequest::Status translate(proto::WorkerCommandGetChunkListR::Status status) { +xrdreq::GetChunkListQservRequest::Status translate(proto::WorkerCommandGetChunkListR::Status status) { switch (status) { case proto::WorkerCommandGetChunkListR::SUCCESS: - return wpublish::GetChunkListQservRequest::SUCCESS; + return xrdreq::GetChunkListQservRequest::SUCCESS; case proto::WorkerCommandGetChunkListR::ERROR: - return wpublish::GetChunkListQservRequest::ERROR; + return xrdreq::GetChunkListQservRequest::ERROR; } throw domain_error("GetChunkListQservRequest::translate no match for Protobuf status: " + proto::WorkerCommandGetChunkListR_Status_Name(status)); } } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { string GetChunkListQservRequest::status2str(Status status) { switch (status) { @@ -135,4 +135,4 @@ void GetChunkListQservRequest::onError(string const& error) { } } -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/GetChunkListQservRequest.h b/src/xrdreq/GetChunkListQservRequest.h similarity index 92% rename from src/wpublish/GetChunkListQservRequest.h rename to src/xrdreq/GetChunkListQservRequest.h index 6e9c35c021..20627b9015 100644 --- a/src/wpublish/GetChunkListQservRequest.h +++ b/src/xrdreq/GetChunkListQservRequest.h @@ -20,8 +20,8 @@ * the GNU General Public License along with this program. If not, * see . */ -#ifndef LSST_QSERV_WPUBLISH_GET_CHUNK_LIST_QSERV_REQUEST_H -#define LSST_QSERV_WPUBLISH_GET_CHUNK_LIST_QSERV_REQUEST_H +#ifndef LSST_QSERV_XRDREQ_GET_CHUNK_LIST_QSERV_REQUEST_H +#define LSST_QSERV_XRDREQ_GET_CHUNK_LIST_QSERV_REQUEST_H // System headers #include @@ -29,9 +29,9 @@ #include // Qserv headers -#include "wpublish/QservRequest.h" +#include "xrdreq/QservRequest.h" -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class GetChunkListQservRequest implements the client-side requests @@ -108,6 +108,6 @@ class GetChunkListQservRequest : public QservRequest { CallbackType _onFinish; }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_GET_CHUNK_LIST_QSERV_REQUEST_H +#endif // LSST_QSERV_XRDREQ_GET_CHUNK_LIST_QSERV_REQUEST_H diff --git a/src/wpublish/GetStatusQservRequest.cc b/src/xrdreq/GetStatusQservRequest.cc similarity index 95% rename from src/wpublish/GetStatusQservRequest.cc rename to src/xrdreq/GetStatusQservRequest.cc index f4522ecfd6..56c1960af6 100644 --- a/src/wpublish/GetStatusQservRequest.cc +++ b/src/xrdreq/GetStatusQservRequest.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/GetStatusQservRequest.h" +#include "xrdreq/GetStatusQservRequest.h" // LSST headers #include "lsst/log/Log.h" @@ -30,11 +30,11 @@ using namespace std; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.GetStatusQservRequest"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.GetStatusQservRequest"); } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { string GetStatusQservRequest::status2str(Status status) { switch (status) { @@ -114,4 +114,4 @@ void GetStatusQservRequest::onError(string const& error) { } } -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/GetStatusQservRequest.h b/src/xrdreq/GetStatusQservRequest.h similarity index 92% rename from src/wpublish/GetStatusQservRequest.h rename to src/xrdreq/GetStatusQservRequest.h index 45b1189362..d557eb7a83 100644 --- a/src/wpublish/GetStatusQservRequest.h +++ b/src/xrdreq/GetStatusQservRequest.h @@ -20,8 +20,8 @@ * the GNU General Public License along with this program. If not, * see . */ -#ifndef LSST_QSERV_WPUBLISH_GET_STATUS_QSERV_REQUEST_H -#define LSST_QSERV_WPUBLISH_GET_STATUS_QSERV_REQUEST_H +#ifndef LSST_QSERV_XRDREQ_GET_STATUS_QSERV_REQUEST_H +#define LSST_QSERV_XRDREQ_GET_STATUS_QSERV_REQUEST_H // System headers #include @@ -32,9 +32,9 @@ // Qserv headers #include "wbase/TaskState.h" -#include "wpublish/QservRequest.h" +#include "xrdreq/QservRequest.h" -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class GetStatusQservRequest represents a request returning various info @@ -95,6 +95,6 @@ class GetStatusQservRequest : public QservRequest { CallbackType _onFinish; }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_GET_STATUS_QSERV_REQUEST_H +#endif // LSST_QSERV_XRDREQ_GET_STATUS_QSERV_REQUEST_H diff --git a/src/wpublish/QservRequest.cc b/src/xrdreq/QservRequest.cc similarity index 97% rename from src/wpublish/QservRequest.cc rename to src/xrdreq/QservRequest.cc index a591d0bdde..6b038dc696 100644 --- a/src/wpublish/QservRequest.cc +++ b/src/xrdreq/QservRequest.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/QservRequest.h" +#include "xrdreq/QservRequest.h" // System headers #include @@ -34,14 +34,14 @@ using namespace std; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.QservRequest"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.QservRequest"); // Set this parameter to some reasonable default int const bufInitialSize = 1024; } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { atomic QservRequest::_numClassInstances(0); @@ -213,4 +213,4 @@ void QservRequest::ProcessResponseData(const XrdSsiErrInfo& eInfo, char* buff, i } } -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/QservRequest.h b/src/xrdreq/QservRequest.h similarity index 95% rename from src/wpublish/QservRequest.h rename to src/xrdreq/QservRequest.h index 835cdab3f9..efd57cb670 100644 --- a/src/wpublish/QservRequest.h +++ b/src/xrdreq/QservRequest.h @@ -20,8 +20,8 @@ * the GNU General Public License along with this program. If not, * see . */ -#ifndef LSST_QSERV_WPUBLISH_QSERV_REQUEST_H -#define LSST_QSERV_WPUBLISH_QSERV_REQUEST_H +#ifndef LSST_QSERV_XRDREQ_QSERV_REQUEST_H +#define LSST_QSERV_XRDREQ_QSERV_REQUEST_H // System headers #include @@ -35,7 +35,7 @@ #include "proto/FrameBuffer.h" #include "proto/worker.pb.h" -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class QservRequest is a base class for a family of the client-side requests @@ -115,6 +115,6 @@ class QservRequest : public XrdSsiRequest { std::shared_ptr _refToSelf4keepAlive; }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_QSERV_REQUEST_H \ No newline at end of file +#endif // LSST_QSERV_XRDREQ_QSERV_REQUEST_H \ No newline at end of file diff --git a/src/wpublish/QueryManagementAction.cc b/src/xrdreq/QueryManagementAction.cc similarity index 91% rename from src/wpublish/QueryManagementAction.cc rename to src/xrdreq/QueryManagementAction.cc index 5533430b68..759cf3b1dc 100644 --- a/src/wpublish/QueryManagementAction.cc +++ b/src/xrdreq/QueryManagementAction.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/QueryManagementAction.h" +#include "xrdreq/QueryManagementAction.h" // System headers #include @@ -33,7 +33,7 @@ #include "XrdSsi/XrdSsiService.hh" // Qserv headers -#include "wpublish/QueryManagementRequest.h" +#include "xrdreq/QueryManagementRequest.h" // LSST headers #include "lsst/log/Log.h" @@ -44,7 +44,7 @@ extern XrdSsiProvider* XrdSsiProviderClient; using namespace std; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.QueryManagementAction"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.QueryManagementAction"); string xrootdStatus2str(XrdCl::XRootDStatus const& s) { return "status=" + to_string(s.status) + ", code=" + to_string(s.code) + ", errNo=" + to_string(s.errNo) + @@ -60,7 +60,7 @@ struct LocationInfoRAII { } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { void QueryManagementAction::notifyAllWorkers(string const& xrootdFrontendUrl, proto::QueryManagement::Operation op, QueryId queryId, @@ -117,11 +117,11 @@ void QueryManagementAction::_notifyAllWorkers(std::string const& xrootdFrontendU } // Make and configure the request object - auto request = wpublish::QueryManagementRequest::create( + auto request = xrdreq::QueryManagementRequest::create( op, queryId, - [self, workerAddress, onFinish](wpublish::QueryManagementRequest::Status status, + [self, workerAddress, onFinish](xrdreq::QueryManagementRequest::Status status, string const& error) { - if (status != wpublish::QueryManagementRequest::Status::SUCCESS) { + if (status != xrdreq::QueryManagementRequest::Status::SUCCESS) { self->_response[workerAddress] = error; } if (++(self->_numWorkerRequestsFinished) == self->_response.size()) { @@ -135,4 +135,4 @@ void QueryManagementAction::_notifyAllWorkers(std::string const& xrootdFrontendU } } -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/QueryManagementAction.h b/src/xrdreq/QueryManagementAction.h similarity index 93% rename from src/wpublish/QueryManagementAction.h rename to src/xrdreq/QueryManagementAction.h index 78f79e3250..aa63c9ddec 100644 --- a/src/wpublish/QueryManagementAction.h +++ b/src/xrdreq/QueryManagementAction.h @@ -19,8 +19,8 @@ * the GNU General Public License along with this program. If not, * see . */ -#ifndef LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_ACTION_H -#define LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_ACTION_H +#ifndef LSST_QSERV_XRDREQ_QUERY_MANAGEMENT_ACTION_H +#define LSST_QSERV_XRDREQ_QUERY_MANAGEMENT_ACTION_H // System headers #include @@ -33,7 +33,7 @@ #include "global/intTypes.h" #include "proto/worker.pb.h" -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class QueryManagementAction is an interface for managing query completion/cancellation @@ -91,6 +91,6 @@ class QueryManagementAction : public std::enable_shared_from_this _numWorkerRequestsFinished{0}; }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_ACTION_H +#endif // LSST_QSERV_XRDREQ_QUERY_MANAGEMENT_ACTION_H diff --git a/src/wpublish/QueryManagementRequest.cc b/src/xrdreq/QueryManagementRequest.cc similarity index 94% rename from src/wpublish/QueryManagementRequest.cc rename to src/xrdreq/QueryManagementRequest.cc index c352c01c16..e37a907c20 100644 --- a/src/wpublish/QueryManagementRequest.cc +++ b/src/xrdreq/QueryManagementRequest.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/QueryManagementRequest.h" +#include "xrdreq/QueryManagementRequest.h" // System headers #include @@ -32,10 +32,10 @@ using namespace std; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.QueryManagementRequest"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.QueryManagementRequest"); } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { string QueryManagementRequest::status2str(Status status) { switch (status) { @@ -103,4 +103,4 @@ void QueryManagementRequest::onError(string const& error) { } } -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/QueryManagementRequest.h b/src/xrdreq/QueryManagementRequest.h similarity index 92% rename from src/wpublish/QueryManagementRequest.h rename to src/xrdreq/QueryManagementRequest.h index 50287c79a9..93471abf96 100644 --- a/src/wpublish/QueryManagementRequest.h +++ b/src/xrdreq/QueryManagementRequest.h @@ -19,8 +19,8 @@ * the GNU General Public License along with this program. If not, * see . */ -#ifndef LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_REQUEST_H -#define LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_REQUEST_H +#ifndef LSST_QSERV_XRDREQ_QUERY_MANAGEMENT_REQUEST_H +#define LSST_QSERV_XRDREQ_QUERY_MANAGEMENT_REQUEST_H // System headers #include @@ -30,9 +30,9 @@ // Qserv headers #include "global/intTypes.h" #include "proto/worker.pb.h" -#include "wpublish/QservRequest.h" +#include "xrdreq/QservRequest.h" -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class QueryManagementRequest represents requests for managing query @@ -96,6 +96,6 @@ class QueryManagementRequest : public QservRequest { CallbackType _onFinish; }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_QUERY_MANAGEMENT_REQUEST_H +#endif // LSST_QSERV_XRDREQ_QUERY_MANAGEMENT_REQUEST_H diff --git a/src/wpublish/SetChunkListQservRequest.cc b/src/xrdreq/SetChunkListQservRequest.cc similarity index 90% rename from src/wpublish/SetChunkListQservRequest.cc rename to src/xrdreq/SetChunkListQservRequest.cc index 12806701d4..8038b96b06 100644 --- a/src/wpublish/SetChunkListQservRequest.cc +++ b/src/xrdreq/SetChunkListQservRequest.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/SetChunkListQservRequest.h" +#include "xrdreq/SetChunkListQservRequest.h" // System headers #include @@ -35,25 +35,25 @@ using namespace lsst::qserv; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.SetChunkListQservRequest"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.SetChunkListQservRequest"); -wpublish::SetChunkListQservRequest::Status translate(proto::WorkerCommandSetChunkListR::Status status) { +xrdreq::SetChunkListQservRequest::Status translate(proto::WorkerCommandSetChunkListR::Status status) { switch (status) { case proto::WorkerCommandSetChunkListR::SUCCESS: - return wpublish::SetChunkListQservRequest::SUCCESS; + return xrdreq::SetChunkListQservRequest::SUCCESS; case proto::WorkerCommandSetChunkListR::INVALID: - return wpublish::SetChunkListQservRequest::INVALID; + return xrdreq::SetChunkListQservRequest::INVALID; case proto::WorkerCommandSetChunkListR::IN_USE: - return wpublish::SetChunkListQservRequest::IN_USE; + return xrdreq::SetChunkListQservRequest::IN_USE; case proto::WorkerCommandSetChunkListR::ERROR: - return wpublish::SetChunkListQservRequest::ERROR; + return xrdreq::SetChunkListQservRequest::ERROR; } throw domain_error("SetChunkListQservRequest::translate no match for Protobuf status: " + proto::WorkerCommandSetChunkListR_Status_Name(status)); } } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { string SetChunkListQservRequest::status2str(Status status) { switch (status) { @@ -156,4 +156,4 @@ void SetChunkListQservRequest::onError(string const& error) { } } -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/SetChunkListQservRequest.h b/src/xrdreq/SetChunkListQservRequest.h similarity index 93% rename from src/wpublish/SetChunkListQservRequest.h rename to src/xrdreq/SetChunkListQservRequest.h index 3ed03ac845..d43f2c0f96 100644 --- a/src/wpublish/SetChunkListQservRequest.h +++ b/src/xrdreq/SetChunkListQservRequest.h @@ -20,8 +20,8 @@ * the GNU General Public License along with this program. If not, * see . */ -#ifndef LSST_QSERV_WPUBLISH_SET_CHUNK_LIST_QSERV_REQUEST_H -#define LSST_QSERV_WPUBLISH_SET_CHUNK_LIST_QSERV_REQUEST_H +#ifndef LSST_QSERV_XRDREQ_SET_CHUNK_LIST_QSERV_REQUEST_H +#define LSST_QSERV_XRDREQ_SET_CHUNK_LIST_QSERV_REQUEST_H // System headers #include @@ -30,9 +30,9 @@ #include // Qserv headers -#include "wpublish/QservRequest.h" +#include "xrdreq/QservRequest.h" -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class SetChunkListQservRequest implements the client-side requests @@ -116,6 +116,6 @@ class SetChunkListQservRequest : public QservRequest { CallbackType _onFinish; }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_SET_CHUNK_LIST_QSERV_REQUEST_H +#endif // LSST_QSERV_XRDREQ_SET_CHUNK_LIST_QSERV_REQUEST_H diff --git a/src/wpublish/TestEchoQservRequest.cc b/src/xrdreq/TestEchoQservRequest.cc similarity index 91% rename from src/wpublish/TestEchoQservRequest.cc rename to src/xrdreq/TestEchoQservRequest.cc index d6df7166ff..ad9bda0342 100644 --- a/src/wpublish/TestEchoQservRequest.cc +++ b/src/xrdreq/TestEchoQservRequest.cc @@ -21,7 +21,7 @@ */ // Class header -#include "wpublish/TestEchoQservRequest.h" +#include "xrdreq/TestEchoQservRequest.h" // System headers #include @@ -34,23 +34,23 @@ using namespace std; namespace { -LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.TestEchoQservRequest"); +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.TestEchoQservRequest"); using namespace lsst::qserv; -wpublish::TestEchoQservRequest::Status translate(proto::WorkerCommandTestEchoR::Status status) { +xrdreq::TestEchoQservRequest::Status translate(proto::WorkerCommandTestEchoR::Status status) { switch (status) { case proto::WorkerCommandTestEchoR::SUCCESS: - return wpublish::TestEchoQservRequest::SUCCESS; + return xrdreq::TestEchoQservRequest::SUCCESS; case proto::WorkerCommandTestEchoR::ERROR: - return wpublish::TestEchoQservRequest::ERROR; + return xrdreq::TestEchoQservRequest::ERROR; } throw domain_error("TestEchoQservRequest::" + string(__func__) + " no match for Protobuf status: " + proto::WorkerCommandTestEchoR_Status_Name(status)); } } // namespace -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { string TestEchoQservRequest::status2str(Status status) { switch (status) { @@ -126,4 +126,4 @@ void TestEchoQservRequest::onError(string const& error) { } } -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq diff --git a/src/wpublish/TestEchoQservRequest.h b/src/xrdreq/TestEchoQservRequest.h similarity index 92% rename from src/wpublish/TestEchoQservRequest.h rename to src/xrdreq/TestEchoQservRequest.h index 0c613598c1..2a375e408d 100644 --- a/src/wpublish/TestEchoQservRequest.h +++ b/src/xrdreq/TestEchoQservRequest.h @@ -20,8 +20,8 @@ * the GNU General Public License along with this program. If not, * see . */ -#ifndef LSST_QSERV_WPUBLISH_TEST_ECHO_QSERV_REQUEST_H -#define LSST_QSERV_WPUBLISH_TEST_ECHO_QSERV_REQUEST_H +#ifndef LSST_QSERV_XRDREQ_TEST_ECHO_QSERV_REQUEST_H +#define LSST_QSERV_XRDREQ_TEST_ECHO_QSERV_REQUEST_H // System headers #include @@ -29,9 +29,9 @@ #include // Qserv headers -#include "wpublish/QservRequest.h" +#include "xrdreq/QservRequest.h" -namespace lsst::qserv::wpublish { +namespace lsst::qserv::xrdreq { /** * Class TestEchoQservRequest represents a simple test request sending a string @@ -99,6 +99,6 @@ class TestEchoQservRequest : public QservRequest { CallbackType _onFinish; }; -} // namespace lsst::qserv::wpublish +} // namespace lsst::qserv::xrdreq -#endif // LSST_QSERV_WPUBLISH_TEST_ECHO_QSERV_REQUEST_H +#endif // LSST_QSERV_XRDREQ_TEST_ECHO_QSERV_REQUEST_H diff --git a/src/wpublish/qserv-query-management.cc b/src/xrdreq/qserv-query-management.cc similarity index 89% rename from src/wpublish/qserv-query-management.cc rename to src/xrdreq/qserv-query-management.cc index d50d0f81ee..65a7b87757 100644 --- a/src/wpublish/qserv-query-management.cc +++ b/src/xrdreq/qserv-query-management.cc @@ -35,8 +35,8 @@ #include "proto/worker.pb.h" #include "util/BlockPost.h" #include "util/CmdLineParser.h" -#include "wpublish/QueryManagementAction.h" -#include "wpublish/QueryManagementRequest.h" +#include "xrdreq/QueryManagementAction.h" +#include "xrdreq/QueryManagementRequest.h" /// This C++ symbol is provided by the SSI shared library extern XrdSsiProvider* XrdSsiProviderClient; @@ -44,7 +44,7 @@ extern XrdSsiProvider* XrdSsiProviderClient; namespace global = lsst::qserv; namespace proto = lsst::qserv::proto; namespace util = lsst::qserv::util; -namespace wpublish = lsst::qserv::wpublish; +namespace xrdreq = lsst::qserv::xrdreq; using namespace std; @@ -72,9 +72,9 @@ proto::QueryManagement::Operation str2operation(string const& str) { int test() { bool finished = false; if (allWorkers) { - wpublish::QueryManagementAction::notifyAllWorkers( + xrdreq::QueryManagementAction::notifyAllWorkers( serviceProviderLocation, operation, queryId, - [&finished](wpublish::QueryManagementAction::Response const& response) { + [&finished](xrdreq::QueryManagementAction::Response const& response) { for (auto itr : response) { cout << "worker: " << itr.first << " error: " << itr.second << endl; } @@ -92,10 +92,10 @@ int test() { cout << "connected to service provider at: " << serviceProviderLocation << endl; // Prepare the request - auto request = wpublish::QueryManagementRequest::create( + auto request = xrdreq::QueryManagementRequest::create( operation, queryId, - [&finished](wpublish::QueryManagementRequest::Status status, string const& error) { - cout << "status=" << wpublish::QueryManagementRequest::status2str(status) << ", error='" + [&finished](xrdreq::QueryManagementRequest::Status status, string const& error) { + cout << "status=" << xrdreq::QueryManagementRequest::status2str(status) << ", error='" << error << "'" << endl; finished = true; }); diff --git a/src/wpublish/qserv-worker-notify.cc b/src/xrdreq/qserv-worker-notify.cc similarity index 73% rename from src/wpublish/qserv-worker-notify.cc rename to src/xrdreq/qserv-worker-notify.cc index a90d7c59c3..e82e135340 100644 --- a/src/wpublish/qserv-worker-notify.cc +++ b/src/xrdreq/qserv-worker-notify.cc @@ -17,19 +17,19 @@ #include "proto/worker.pb.h" #include "util/BlockPost.h" #include "util/CmdLineParser.h" -#include "wpublish/ChunkGroupQservRequest.h" -#include "wpublish/ChunkListQservRequest.h" -#include "wpublish/GetChunkListQservRequest.h" -#include "wpublish/GetStatusQservRequest.h" -#include "wpublish/SetChunkListQservRequest.h" -#include "wpublish/TestEchoQservRequest.h" +#include "xrdreq/ChunkGroupQservRequest.h" +#include "xrdreq/ChunkListQservRequest.h" +#include "xrdreq/GetChunkListQservRequest.h" +#include "xrdreq/GetStatusQservRequest.h" +#include "xrdreq/SetChunkListQservRequest.h" +#include "xrdreq/TestEchoQservRequest.h" /// This C++ symbol is provided by the SSI shared library extern XrdSsiProvider* XrdSsiProviderClient; namespace global = lsst::qserv; namespace util = lsst::qserv::util; -namespace wpublish = lsst::qserv::wpublish; +namespace xrdreq = lsst::qserv::xrdreq; using namespace std; @@ -65,7 +65,7 @@ bool printReport; * * @param chunk - collection to be initialize */ -void readInFile(wpublish::SetChunkListQservRequest::ChunkCollection& chunks, vector& databases) { +void readInFile(xrdreq::SetChunkListQservRequest::ChunkCollection& chunks, vector& databases) { chunks.clear(); databases.clear(); @@ -90,7 +90,7 @@ void readInFile(wpublish::SetChunkListQservRequest::ChunkCollection& chunks, vec } unsigned long const chunk = stoul(databaseAndChunk.substr(pos + 1)); string const database = databaseAndChunk.substr(0, pos); - chunks.emplace_back(wpublish::SetChunkListQservRequest::Chunk{ + chunks.emplace_back(xrdreq::SetChunkListQservRequest::Chunk{ (unsigned int)chunk, database, 0 /* use_count (UNUSED) */ }); uniqueDatabaseNames.insert(database); @@ -105,14 +105,14 @@ int test() { atomic finished(false); - shared_ptr request = nullptr; + shared_ptr request = nullptr; if ("GET_CHUNK_LIST" == operation) { - request = wpublish::GetChunkListQservRequest::create( - inUseOnly, [&finished](wpublish::GetChunkListQservRequest::Status status, string const& error, - wpublish::GetChunkListQservRequest::ChunkCollection const& chunks) { - if (status != wpublish::GetChunkListQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::GetChunkListQservRequest::status2str(status) << "\n" + request = xrdreq::GetChunkListQservRequest::create( + inUseOnly, [&finished](xrdreq::GetChunkListQservRequest::Status status, string const& error, + xrdreq::GetChunkListQservRequest::ChunkCollection const& chunks) { + if (status != xrdreq::GetChunkListQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::GetChunkListQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { cout << "# total chunks: " << chunks.size() << "\n" << endl; @@ -131,16 +131,16 @@ int test() { }); } else if ("SET_CHUNK_LIST" == operation) { - wpublish::SetChunkListQservRequest::ChunkCollection chunks; + xrdreq::SetChunkListQservRequest::ChunkCollection chunks; vector databases; readInFile(chunks, databases); - request = wpublish::SetChunkListQservRequest::create( + request = xrdreq::SetChunkListQservRequest::create( chunks, databases, force, - [&finished](wpublish::SetChunkListQservRequest::Status status, string const& error, - wpublish::SetChunkListQservRequest::ChunkCollection const& chunks) { - if (status != wpublish::SetChunkListQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::SetChunkListQservRequest::status2str(status) << "\n" + [&finished](xrdreq::SetChunkListQservRequest::Status status, string const& error, + xrdreq::SetChunkListQservRequest::ChunkCollection const& chunks) { + if (status != xrdreq::SetChunkListQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::SetChunkListQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { cout << "# total chunks: " << chunks.size() << "\n" << endl; @@ -159,12 +159,12 @@ int test() { }); } else if ("REBUILD_CHUNK_LIST" == operation) { - request = wpublish::RebuildChunkListQservRequest::create( - reload, [&finished](wpublish::ChunkListQservRequest::Status status, string const& error, - wpublish::ChunkListQservRequest::ChunkCollection const& added, - wpublish::ChunkListQservRequest::ChunkCollection const& removed) { - if (status != wpublish::ChunkListQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::ChunkListQservRequest::status2str(status) << "\n" + request = xrdreq::RebuildChunkListQservRequest::create( + reload, [&finished](xrdreq::ChunkListQservRequest::Status status, string const& error, + xrdreq::ChunkListQservRequest::ChunkCollection const& added, + xrdreq::ChunkListQservRequest::ChunkCollection const& removed) { + if (status != xrdreq::ChunkListQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::ChunkListQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { cout << "# chunks added: " << added.size() << "\n" @@ -174,12 +174,12 @@ int test() { }); } else if ("RELOAD_CHUNK_LIST" == operation) { - request = wpublish::ReloadChunkListQservRequest::create( - [&finished](wpublish::ChunkListQservRequest::Status status, string const& error, - wpublish::ChunkListQservRequest::ChunkCollection const& added, - wpublish::ChunkListQservRequest::ChunkCollection const& removed) { - if (status != wpublish::ChunkListQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::ChunkListQservRequest::status2str(status) << "\n" + request = xrdreq::ReloadChunkListQservRequest::create( + [&finished](xrdreq::ChunkListQservRequest::Status status, string const& error, + xrdreq::ChunkListQservRequest::ChunkCollection const& added, + xrdreq::ChunkListQservRequest::ChunkCollection const& removed) { + if (status != xrdreq::ChunkListQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::ChunkListQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { cout << "# chunks added: " << added.size() << "\n" @@ -189,33 +189,32 @@ int test() { }); } else if ("ADD_CHUNK_GROUP" == operation) { - request = wpublish::AddChunkGroupQservRequest::create( - chunk, dbs, - [&finished](wpublish::ChunkGroupQservRequest::Status status, string const& error) { - if (status != wpublish::ChunkGroupQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::ChunkGroupQservRequest::status2str(status) << "\n" + request = xrdreq::AddChunkGroupQservRequest::create( + chunk, dbs, [&finished](xrdreq::ChunkGroupQservRequest::Status status, string const& error) { + if (status != xrdreq::ChunkGroupQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::ChunkGroupQservRequest::status2str(status) << "\n" << "error: " << error << endl; } finished = true; }); } else if ("REMOVE_CHUNK_GROUP" == operation) { - request = wpublish::RemoveChunkGroupQservRequest::create( + request = xrdreq::RemoveChunkGroupQservRequest::create( chunk, dbs, force, - [&finished](wpublish::ChunkGroupQservRequest::Status status, string const& error) { - if (status != wpublish::ChunkGroupQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::ChunkGroupQservRequest::status2str(status) << "\n" + [&finished](xrdreq::ChunkGroupQservRequest::Status status, string const& error) { + if (status != xrdreq::ChunkGroupQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::ChunkGroupQservRequest::status2str(status) << "\n" << "error: " << error << endl; } finished = true; }); } else if ("TEST_ECHO" == operation) { - request = wpublish::TestEchoQservRequest::create( - value, [&finished](wpublish::TestEchoQservRequest::Status status, string const& error, + request = xrdreq::TestEchoQservRequest::create( + value, [&finished](xrdreq::TestEchoQservRequest::Status status, string const& error, string const& sent, string const& received) { - if (status != wpublish::TestEchoQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::TestEchoQservRequest::status2str(status) << "\n" + if (status != xrdreq::TestEchoQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::TestEchoQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { cout << "value sent: " << sent << "\n" @@ -225,12 +224,12 @@ int test() { }); } else if ("GET_STATUS" == operation) { - request = wpublish::GetStatusQservRequest::create( + request = xrdreq::GetStatusQservRequest::create( includeTasks, queryIds, - [&finished](wpublish::GetStatusQservRequest::Status status, string const& error, + [&finished](xrdreq::GetStatusQservRequest::Status status, string const& error, string const& info) { - if (status != wpublish::GetStatusQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::GetStatusQservRequest::status2str(status) << "\n" + if (status != xrdreq::GetStatusQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::GetStatusQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { cout << "worker info: " << info << endl; diff --git a/src/wpublish/qserv-worker-perf-chunks.cc b/src/xrdreq/qserv-worker-perf-chunks.cc similarity index 99% rename from src/wpublish/qserv-worker-perf-chunks.cc rename to src/xrdreq/qserv-worker-perf-chunks.cc index cce8270fb3..78efb09bf1 100644 --- a/src/wpublish/qserv-worker-perf-chunks.cc +++ b/src/xrdreq/qserv-worker-perf-chunks.cc @@ -48,19 +48,19 @@ #include "util/BlockPost.h" #include "util/CmdLineParser.h" #include "util/File.h" -#include "wpublish/TestEchoQservRequest.h" +#include "xrdreq/TestEchoQservRequest.h" /// This C++ symbol is provided by the SSI shared library extern XrdSsiProvider* XrdSsiProviderClient; namespace global = lsst::qserv; namespace util = lsst::qserv::util; -namespace wpublish = lsst::qserv::wpublish; +namespace xrdreq = lsst::qserv::xrdreq; using namespace std; using namespace std::placeholders; -using RequestT = wpublish::TestEchoQservRequest; +using RequestT = xrdreq::TestEchoQservRequest; namespace { diff --git a/src/wpublish/qserv-worker-perf.cc b/src/xrdreq/qserv-worker-perf.cc similarity index 85% rename from src/wpublish/qserv-worker-perf.cc rename to src/xrdreq/qserv-worker-perf.cc index 3a17fa6cf2..03d6b633c8 100644 --- a/src/wpublish/qserv-worker-perf.cc +++ b/src/xrdreq/qserv-worker-perf.cc @@ -16,14 +16,14 @@ #include "util/BlockPost.h" #include "util/CmdLineParser.h" #include "util/File.h" -#include "wpublish/TestEchoQservRequest.h" +#include "xrdreq/TestEchoQservRequest.h" /// This C++ symbol is provided by the SSI shared library extern XrdSsiProvider* XrdSsiProviderClient; namespace global = lsst::qserv; namespace util = lsst::qserv::util; -namespace wpublish = lsst::qserv::wpublish; +namespace xrdreq = lsst::qserv::xrdreq; using namespace std; @@ -57,7 +57,7 @@ int test() { cout << "connected to service provider at: " << serviceProviderLocation << endl; // Store request pointers here to prevent them deleted too early - vector requests; + vector requests; atomic finished(0); @@ -66,12 +66,11 @@ int test() { string const& worker = workers[j]; for (unsigned int i = 0; i < numRequests; ++i) { - auto request = wpublish::TestEchoQservRequest::create( - value, [&finished](wpublish::TestEchoQservRequest::Status status, string const& error, + auto request = xrdreq::TestEchoQservRequest::create( + value, [&finished](xrdreq::TestEchoQservRequest::Status status, string const& error, string const& sent, string const& received) { - if (status != wpublish::TestEchoQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::TestEchoQservRequest::status2str(status) - << "\n" + if (status != xrdreq::TestEchoQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::TestEchoQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { cout << "value sent: " << sent << "\n" @@ -92,12 +91,11 @@ int test() { for (unsigned int i = 0; i < numRequests; ++i) { for (unsigned int j = 0; j < numWorkers; ++j) { string const& worker = workers[j]; - auto request = wpublish::TestEchoQservRequest::create( - value, [&finished](wpublish::TestEchoQservRequest::Status status, string const& error, + auto request = xrdreq::TestEchoQservRequest::create( + value, [&finished](xrdreq::TestEchoQservRequest::Status status, string const& error, string const& sent, string const& received) { - if (status != wpublish::TestEchoQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::TestEchoQservRequest::status2str(status) - << "\n" + if (status != xrdreq::TestEchoQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::TestEchoQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { cout << "value sent: " << sent << "\n" diff --git a/src/wpublish/qserv-worker-status.cc b/src/xrdreq/qserv-worker-status.cc similarity index 87% rename from src/wpublish/qserv-worker-status.cc rename to src/xrdreq/qserv-worker-status.cc index 9ce7c580f3..4e16a43d22 100644 --- a/src/wpublish/qserv-worker-status.cc +++ b/src/xrdreq/qserv-worker-status.cc @@ -16,14 +16,14 @@ #include "util/BlockPost.h" #include "util/CmdLineParser.h" #include "util/File.h" -#include "wpublish/GetStatusQservRequest.h" +#include "xrdreq/GetStatusQservRequest.h" /// This C++ symbol is provided by the SSI shared library extern XrdSsiProvider* XrdSsiProviderClient; namespace global = lsst::qserv; namespace util = lsst::qserv::util; -namespace wpublish = lsst::qserv::wpublish; +namespace xrdreq = lsst::qserv::xrdreq; using namespace std; @@ -58,7 +58,7 @@ int test() { cout << "connected to service provider at: " << serviceProviderLocation << endl; // Store request pointers here to prevent them deleted too early - vector requests; + vector requests; atomic finished(0); @@ -67,12 +67,12 @@ int test() { string const& worker = workers[j]; for (unsigned int i = 0; i < numRequests; ++i) { - auto request = wpublish::GetStatusQservRequest::create( + auto request = xrdreq::GetStatusQservRequest::create( includeTasks, queryIds, - [&finished](wpublish::GetStatusQservRequest::Status status, string const& error, + [&finished](xrdreq::GetStatusQservRequest::Status status, string const& error, string const& info) { - if (status != wpublish::GetStatusQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::GetStatusQservRequest::status2str(status) + if (status != xrdreq::GetStatusQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::GetStatusQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { @@ -93,11 +93,11 @@ int test() { for (unsigned int i = 0; i < numRequests; ++i) { for (unsigned int j = 0; j < numWorkers; ++j) { string const& worker = workers[j]; - auto request = wpublish::GetStatusQservRequest::create( - [&finished](wpublish::GetStatusQservRequest::Status status, string const& error, + auto request = xrdreq::GetStatusQservRequest::create( + [&finished](xrdreq::GetStatusQservRequest::Status status, string const& error, string const& info) { - if (status != wpublish::GetStatusQservRequest::Status::SUCCESS) { - cout << "status: " << wpublish::GetStatusQservRequest::status2str(status) + if (status != xrdreq::GetStatusQservRequest::Status::SUCCESS) { + cout << "status: " << xrdreq::GetStatusQservRequest::status2str(status) << "\n" << "error: " << error << endl; } else { From e503efe38578957ec8c0e99537b4eb5687f595f0 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 12 Jun 2023 18:56:49 -0700 Subject: [PATCH 7/8] Wired the query control to notify workers on query completion/cancellation Added Czar configuration options to disable this feature if needed. The options will exist for some time before the new code will be proven to work w/o any side effects to the query processing. --- .../templates/proxy/etc/qserv-czar.cnf.jinja | 12 ++++++ src/ccontrol/CMakeLists.txt | 1 + src/ccontrol/UserQuerySelect.cc | 25 ++++++++++-- src/czar/Czar.cc | 38 +++++++++++++++++++ src/czar/Czar.h | 4 ++ src/czar/CzarConfig.cc | 4 +- src/czar/CzarConfig.h | 15 ++++++++ 7 files changed, 95 insertions(+), 4 deletions(-) diff --git a/src/admin/templates/proxy/etc/qserv-czar.cnf.jinja b/src/admin/templates/proxy/etc/qserv-czar.cnf.jinja index 21fc80b641..d8ebcf29b6 100644 --- a/src/admin/templates/proxy/etc/qserv-czar.cnf.jinja +++ b/src/admin/templates/proxy/etc/qserv-czar.cnf.jinja @@ -91,6 +91,18 @@ xrootdSpread = 0 # This is per user query and important milestones ignore this limit. qMetaSecsBetweenChunkCompletionUpdates = 59 + +# If not 0 then broadcast query completion/cancellation events to all workers +# so that they would do proper garbage collection and resource recycling. +notifyWorkersOnQueryFinish = 1 + +# If not 0 then broadcast this event to all workers to let them cancel any older +# that were submitted before the restart. The first query identifier in the new +# series will be reported to the workers. The identifier will be used as +# a high watermark for diffirentiating between the older (to be cancelled) +# and the newer queries. +notifyWorkersOnCzarRestart = 1 + #[debug] #chunkLimit = -1 diff --git a/src/ccontrol/CMakeLists.txt b/src/ccontrol/CMakeLists.txt index 91803fca81..f40e15094b 100644 --- a/src/ccontrol/CMakeLists.txt +++ b/src/ccontrol/CMakeLists.txt @@ -29,6 +29,7 @@ target_link_libraries(ccontrol PUBLIC log parser sphgeom + xrdreq ) FUNCTION(ccontrol_tests) diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index 0b09539e90..242d30e16f 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -66,6 +66,7 @@ #include #include #include +#include // Third-party headers #include @@ -78,6 +79,7 @@ #include "ccontrol/MergingHandler.h" #include "ccontrol/TmpTableName.h" #include "ccontrol/UserQueryError.h" +#include "czar/CzarConfig.h" #include "global/constants.h" #include "global/LogContext.h" #include "global/MsgReceiver.h" @@ -103,6 +105,7 @@ #include "sql/Schema.h" #include "util/IterableFormatter.h" #include "util/ThreadPriority.h" +#include "xrdreq/QueryManagementAction.h" namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.ccontrol.UserQuerySelect"); @@ -352,6 +355,9 @@ QueryState UserQuerySelect::join() { } _executive->updateProxyMessages(); + // Capture these parameters before discarding the merger which would also reset the config. + bool const notifyWorkersOnQueryFinish = _infileMergerConfig->czarConfig.notifyWorkersOnQueryFinish(); + std::string const xrootdFrontendUrl = _infileMergerConfig->czarConfig.getXrootdFrontendUrl(); try { _discardMerger(); } catch (std::exception const& exc) { @@ -367,19 +373,32 @@ QueryState UserQuerySelect::join() { // finalRows < 0 indicates there was no postprocessing, so collected rows and final rows should be the // same. if (finalRows < 0) finalRows = collectedRows; + // Notify workers on the query completion/cancellation to ensure + // resources are properly cleaned over there as well. + proto::QueryManagement::Operation operation = proto::QueryManagement::COMPLETE; + QueryState state = SUCCESS; if (successful) { _qMetaUpdateStatus(qmeta::QInfo::COMPLETED, collectedRows, collectedBytes, finalRows); LOGS(_log, LOG_LVL_INFO, "Joined everything (success)"); - return SUCCESS; } else if (_killed) { // status is already set to ABORTED LOGS(_log, LOG_LVL_ERROR, "Joined everything (killed)"); - return ERROR; + operation = proto::QueryManagement::CANCEL; + state = ERROR; } else { _qMetaUpdateStatus(qmeta::QInfo::FAILED, collectedRows, collectedBytes, finalRows); LOGS(_log, LOG_LVL_ERROR, "Joined everything (failure!)"); - return ERROR; + operation = proto::QueryManagement::CANCEL; + state = ERROR; } + if (notifyWorkersOnQueryFinish) { + try { + xrdreq::QueryManagementAction::notifyAllWorkers(xrootdFrontendUrl, operation, _qMetaQueryId); + } catch (std::exception const& ex) { + LOGS(_log, LOG_LVL_WARN, ex.what()); + } + } + return state; } /// Release resources held by the merger diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index d3991d57a3..4e85dc8183 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -25,6 +25,7 @@ // System headers #include +#include #include // Third-party headers @@ -42,6 +43,7 @@ #include "czar/CzarErrors.h" #include "czar/MessageTable.h" #include "global/LogContext.h" +#include "proto/worker.pb.h" #include "qdisp/PseudoFifo.h" #include "qdisp/QdispPool.h" #include "qdisp/SharedResources.h" @@ -54,6 +56,7 @@ #include "util/FileMonitor.h" #include "util/IterableFormatter.h" #include "util/StringHelper.h" +#include "xrdreq/QueryManagementAction.h" #include "XrdSsi/XrdSsiProvider.hh" using namespace std; @@ -96,6 +99,21 @@ Czar::Czar(string const& configPath, string const& czarName) const int year = 60 * 60 * 24 * 365; _idCounter = uint64_t(tv.tv_sec % year) * 1000 + tv.tv_usec / 1000; + // Tell workers to cancel any queries that were submitted before this restart of Czar. + // Figure out which query (if any) was recorded in Czar database before the restart. + // The id will be used as the high-watermark for queries that need to be cancelled. + // All queries that have identifiers that are strictly less than this one will + // be affected by the operation. + if (_czarConfig.notifyWorkersOnCzarRestart()) { + try { + xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig.getXrootdFrontendUrl(), + proto::QueryManagement::CANCEL_AFTER_RESTART, + _lastQueryIdBeforeRestart()); + } catch (std::exception const& ex) { + LOGS(_log, LOG_LVL_WARN, ex.what()); + } + } + auto databaseModels = qproc::DatabaseModels::create(_czarConfig.getCssConfigMap(), _czarConfig.getMySqlResultConfig()); @@ -461,4 +479,24 @@ void Czar::removeOldResultTables() { _oldTableRemovalThread = std::move(t); } +QueryId Czar::_lastQueryIdBeforeRestart() const { + string const context = "Czar::" + string(__func__) + " "; + auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlQmetaConfig()); + string const sql = "SELECT MAX(queryId) FROM QInfo"; + sql::SqlResults results; + sql::SqlErrorObject err; + if (!sqlConn->runQuery(sql, results, err)) { + string const msg = + context + "Query to find the last query id failed, err=" + err.printErrMsg() + ", sql=" + sql; + throw runtime_error(msg); + } + string queryIdStr; + if (!results.extractFirstValue(queryIdStr, err)) { + string const msg = context + "Failed to extract the last query id from the result set, err=" + + err.printErrMsg() + ", sql=" + sql; + throw runtime_error(msg); + } + return stoull(queryIdStr); +} + } // namespace lsst::qserv::czar diff --git a/src/czar/Czar.h b/src/czar/Czar.h index e80ca6af01..f0acfcf734 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -39,6 +39,7 @@ #include "ccontrol/UserQueryFactory.h" #include "czar/CzarConfig.h" #include "czar/SubmitResult.h" +#include "global/intTypes.h" #include "global/stringTypes.h" #include "mysql/MySqlConfig.h" #include "qdisp/SharedResources.h" @@ -136,6 +137,9 @@ class Czar { /// Create and fill async result table void _makeAsyncResult(std::string const& asyncResultTable, QueryId queryId, std::string const& resultLoc); + /// @return An identifier of the last query that was recorded in the query metadata table + QueryId _lastQueryIdBeforeRestart() const; + static Ptr _czar; ///< Pointer to single instance of the Czar. // combines client name (ID) and its thread ID into one unique ID diff --git a/src/czar/CzarConfig.cc b/src/czar/CzarConfig.cc index 28a9ad8300..a42847349c 100644 --- a/src/czar/CzarConfig.cc +++ b/src/czar/CzarConfig.cc @@ -91,7 +91,9 @@ CzarConfig::CzarConfig(util::ConfigStore const& configStore) _qdispMaxPriority(configStore.getInt("qdisppool.largestPriority", 2)), _qdispVectRunSizes(configStore.get("qdisppool.vectRunSizes", "50:50:50:50")), _qdispVectMinRunningSizes(configStore.get("qdisppool.vectMinRunningSizes", "0:1:3:3")), - _qReqPseudoFifoMaxRunning(configStore.getInt("qdisppool.qReqPseudoFifoMaxRunning", 300)) {} + _qReqPseudoFifoMaxRunning(configStore.getInt("qdisppool.qReqPseudoFifoMaxRunning", 300)), + _notifyWorkersOnQueryFinish(configStore.getInt("tuning.notifyWorkersOnQueryFinish", 1)), + _notifyWorkersOnCzarRestart(configStore.getInt("tuning.notifyWorkersOnCzarRestart", 1)) {} std::ostream& operator<<(std::ostream& out, CzarConfig const& czarConfig) { out << "[cssConfigMap=" << util::printable(czarConfig._cssConfigMap) diff --git a/src/czar/CzarConfig.h b/src/czar/CzarConfig.h index f30550995c..64bc8f035a 100644 --- a/src/czar/CzarConfig.h +++ b/src/czar/CzarConfig.h @@ -160,6 +160,17 @@ class CzarConfig { int getOldestResultKeptDays() const { return _oldestResultKeptDays; } + /// @return 'true' to allow broadcasting query completion/cancellation events + /// to all workers so that they would do proper garbage collection and resource recycling. + bool notifyWorkersOnQueryFinish() const { return _notifyWorkersOnQueryFinish != 0; } + + /// @return 'true' to allow broadcasting this event to all workers to let them cancel + /// any older that were submitted before the restart. The first query identifier in the new + /// series will be reported to the workers. The identifier will be used as + /// a high watermark for diffirentiating between the older (to be cancelled) + /// and the newer queries. + bool notifyWorkersOnCzarRestart() const { return _notifyWorkersOnCzarRestart != 0; } + private: CzarConfig(util::ConfigStore const& ConfigStore); @@ -196,6 +207,10 @@ class CzarConfig { // Parameters for QueryRequest PseudoFifo int const _qReqPseudoFifoMaxRunning; + + // Events sent to workers + int const _notifyWorkersOnQueryFinish; ///< Sent by cccontrol::UserQuerySelect + int const _notifyWorkersOnCzarRestart; ///< Sent by czar::Czar }; } // namespace lsst::qserv::czar From a0b20e561cf5b2fb16e76953f9714c1d53d2f33a Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 15 Jun 2023 00:50:52 -0700 Subject: [PATCH 8/8] Fixed a bug in the SQL library --- src/sql/SqlResults.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/sql/SqlResults.cc b/src/sql/SqlResults.cc index baa6eccb27..c2cd8e8b0c 100644 --- a/src/sql/SqlResults.cc +++ b/src/sql/SqlResults.cc @@ -183,7 +183,12 @@ bool SqlResults::extractFirstValue(std::string& ret, SqlErrorObject& errObj) { if (!row) { return errObj.addErrMsg("Expecting one row, found no rows"); } - ret = (row[0]); + auto ptr = row[0]; + if (!ptr) { + freeResults(); + return errObj.addErrMsg("NULL returned by the query"); + } + ret = ptr; freeResults(); return true; }