Skip to content

Commit

Permalink
Moved the SSI request classes into a dedicated module xrdreq
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Jun 20, 2023
1 parent 076f6e4 commit 19d6029
Show file tree
Hide file tree
Showing 45 changed files with 314 additions and 284 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ add_subdirectory(wpublish)
add_subdirectory(wsched)
add_subdirectory(www)
add_subdirectory(xrdlog)
add_subdirectory(xrdreq)
add_subdirectory(xrdsvc)

#-----------------------------------------------------------------------------
Expand Down Expand Up @@ -127,6 +128,7 @@ target_link_libraries(qserv_czar PUBLIC
rproc
qserv_css
qserv_meta
xrdreq
)

install(
Expand Down
1 change: 1 addition & 0 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ FUNCTION(ccontrol_tests)
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
1 change: 1 addition & 0 deletions src/qana/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ FUNCTION(qana_tests)
qserv_css
qserv_meta
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
1 change: 1 addition & 0 deletions src/qdisp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ target_link_libraries(testQDisp
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
1 change: 1 addition & 0 deletions src/qproc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ FUNCTION(qproc_tests)
qserv_css
qserv_meta
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
1 change: 1 addition & 0 deletions src/query/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ FUNCTION(query_tests)
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
14 changes: 7 additions & 7 deletions src/replica/AddReplicaQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,36 +73,36 @@ list<pair<string, string>> AddReplicaQservMgtRequest::extendedPersistentState()
void AddReplicaQservMgtRequest::startImpl(replica::Lock const& lock) {
auto const request = shared_from_base<AddReplicaQservMgtRequest>();

_qservRequest = wpublish::AddChunkGroupQservRequest::create(
_qservRequest = xrdreq::AddChunkGroupQservRequest::create(
chunk(), databases(),
[request](wpublish::ChunkGroupQservRequest::Status status, string const& error) {
[request](xrdreq::ChunkGroupQservRequest::Status status, string const& error) {
if (request->state() == State::FINISHED) return;

replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]");

if (request->state() == State::FINISHED) return;

switch (status) {
case wpublish::ChunkGroupQservRequest::Status::SUCCESS:
case xrdreq::ChunkGroupQservRequest::Status::SUCCESS:
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
break;

case wpublish::ChunkGroupQservRequest::Status::INVALID:
case xrdreq::ChunkGroupQservRequest::Status::INVALID:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD, error);
break;

case wpublish::ChunkGroupQservRequest::Status::IN_USE:
case xrdreq::ChunkGroupQservRequest::Status::IN_USE:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_CHUNK_IN_USE, error);
break;

case wpublish::ChunkGroupQservRequest::Status::ERROR:
case xrdreq::ChunkGroupQservRequest::Status::ERROR:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;

default:
throw logic_error("AddReplicaQservMgtRequest::" + string(__func__) +
" unhandled server status: " +
wpublish::ChunkGroupQservRequest::status2str(status));
xrdreq::ChunkGroupQservRequest::status2str(status));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
Expand Down
4 changes: 2 additions & 2 deletions src/replica/AddReplicaQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
// Qserv headers
#include "replica/QservMgtRequest.h"
#include "replica/ServiceProvider.h"
#include "wpublish/ChunkGroupQservRequest.h"
#include "xrdreq/ChunkGroupQservRequest.h"

// This header declarations

Expand Down Expand Up @@ -101,7 +101,7 @@ class AddReplicaQservMgtRequest : public QservMgtRequest {
CallbackType _onFinish; /// @note is reset when the request finishes

/// A request to the remote services
wpublish::AddChunkGroupQservRequest::Ptr _qservRequest;
xrdreq::AddChunkGroupQservRequest::Ptr _qservRequest;
};

} // namespace lsst::qserv::replica
Expand Down
5 changes: 4 additions & 1 deletion src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ target_include_directories(replica PRIVATE

target_link_libraries(replica PUBLIC
qserv_css
xrdreq
xrdsvc
XrdCl
XrdSsiLib
Expand All @@ -483,7 +484,9 @@ FUNCTION(REPLICA_UTILS)
add_executable(${UTIL})
target_sources(${UTIL} PRIVATE tools/${UTIL}.cc)
target_include_directories(${UTIL} PRIVATE ${XROOTD_INCLUDE_DIRS})
target_link_libraries(${UTIL} PRIVATE replica)
target_link_libraries(${UTIL} PRIVATE
replica
)
install(TARGETS ${UTIL})
ENDFOREACH()
ENDFUNCTION()
Expand Down
14 changes: 7 additions & 7 deletions src/replica/GetReplicasQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ list<pair<string, string>> GetReplicasQservMgtRequest::extendedPersistentState()
}

void GetReplicasQservMgtRequest::_setReplicas(
replica::Lock const& lock, wpublish::GetChunkListQservRequest::ChunkCollection const& collection) {
replica::Lock const& lock, xrdreq::GetChunkListQservRequest::ChunkCollection const& collection) {
// Filter results by databases participating in the family

set<string> databases;
Expand Down Expand Up @@ -111,31 +111,31 @@ void GetReplicasQservMgtRequest::startImpl(replica::Lock const& lock) {

auto const request = shared_from_base<GetReplicasQservMgtRequest>();

_qservRequest = wpublish::GetChunkListQservRequest::create(
inUseOnly(), [request](wpublish::GetChunkListQservRequest::Status status, string const& error,
wpublish::GetChunkListQservRequest::ChunkCollection const& collection) {
_qservRequest = xrdreq::GetChunkListQservRequest::create(
inUseOnly(), [request](xrdreq::GetChunkListQservRequest::Status status, string const& error,
xrdreq::GetChunkListQservRequest::ChunkCollection const& collection) {
if (request->state() == State::FINISHED) return;

replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]");

if (request->state() == State::FINISHED) return;

switch (status) {
case wpublish::GetChunkListQservRequest::Status::SUCCESS:
case xrdreq::GetChunkListQservRequest::Status::SUCCESS:

request->_setReplicas(lock, collection);
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
break;

case wpublish::GetChunkListQservRequest::Status::ERROR:
case xrdreq::GetChunkListQservRequest::Status::ERROR:

request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;

default:
throw logic_error("GetReplicasQservMgtRequest::" + string(__func__) +
" unhandled server status: " +
wpublish::GetChunkListQservRequest::status2str(status));
xrdreq::GetChunkListQservRequest::status2str(status));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
Expand Down
6 changes: 3 additions & 3 deletions src/replica/GetReplicasQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include "replica/QservMgtRequest.h"
#include "replica/ReplicaInfo.h"
#include "replica/ServiceProvider.h"
#include "wpublish/GetChunkListQservRequest.h"
#include "xrdreq/GetChunkListQservRequest.h"

// This header declarations
namespace lsst::qserv::replica {
Expand Down Expand Up @@ -109,7 +109,7 @@ class GetReplicasQservMgtRequest : public QservMgtRequest {
* @param collection The input collection of replicas.
*/
void _setReplicas(replica::Lock const& lock,
wpublish::GetChunkListQservRequest::ChunkCollection const& collection);
xrdreq::GetChunkListQservRequest::ChunkCollection const& collection);

// Input parameters

Expand All @@ -118,7 +118,7 @@ class GetReplicasQservMgtRequest : public QservMgtRequest {
CallbackType _onFinish; /// @note is reset when the request finishes

/// A request to the remote services
wpublish::GetChunkListQservRequest::Ptr _qservRequest;
xrdreq::GetChunkListQservRequest::Ptr _qservRequest;

/// A collection of replicas reported by the Qserr worker
QservReplicaCollection _replicas;
Expand Down
12 changes: 6 additions & 6 deletions src/replica/GetStatusQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ list<pair<string, string>> GetStatusQservMgtRequest::extendedPersistentState() c

void GetStatusQservMgtRequest::startImpl(replica::Lock const& lock) {
auto const request = shared_from_base<GetStatusQservMgtRequest>();
_qservRequest = wpublish::GetStatusQservRequest::create(
_taskSelector, [request](wpublish::GetStatusQservRequest::Status status, string const& error,
string const& info) {
_qservRequest = xrdreq::GetStatusQservRequest::create(
_taskSelector,
[request](xrdreq::GetStatusQservRequest::Status status, string const& error, string const& info) {
if (request->state() == State::FINISHED) return;
replica::Lock const lock(request->_mtx, request->context() + string(__func__) + "[callback]");
if (request->state() == State::FINISHED) return;

switch (status) {
case wpublish::GetStatusQservRequest::Status::SUCCESS:
case xrdreq::GetStatusQservRequest::Status::SUCCESS:
try {
request->_setInfo(lock, info);
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
Expand All @@ -98,13 +98,13 @@ void GetStatusQservMgtRequest::startImpl(replica::Lock const& lock) {
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD_RESPONSE, msg);
}
break;
case wpublish::GetStatusQservRequest::Status::ERROR:
case xrdreq::GetStatusQservRequest::Status::ERROR:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;
default:
throw logic_error("GetStatusQservMgtRequest::" + string(__func__) +
" unhandled server status: " +
wpublish::GetStatusQservRequest::status2str(status));
xrdreq::GetStatusQservRequest::status2str(status));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
Expand Down
4 changes: 2 additions & 2 deletions src/replica/GetStatusQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include "replica/QservMgtRequest.h"
#include "replica/ServiceProvider.h"
#include "wbase/TaskState.h"
#include "wpublish/GetStatusQservRequest.h"
#include "xrdreq/GetStatusQservRequest.h"

// This header declarations
namespace lsst::qserv::replica {
Expand Down Expand Up @@ -109,7 +109,7 @@ class GetStatusQservMgtRequest : public QservMgtRequest {
CallbackType _onFinish; ///< this object is reset after finishing the request

/// A request to the remote services
wpublish::GetStatusQservRequest::Ptr _qservRequest;
xrdreq::GetStatusQservRequest::Ptr _qservRequest;

/// The info object returned by the Qserv worker
nlohmann::json _info;
Expand Down
14 changes: 7 additions & 7 deletions src/replica/RemoveReplicaQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,36 +76,36 @@ list<pair<string, string>> RemoveReplicaQservMgtRequest::extendedPersistentState
void RemoveReplicaQservMgtRequest::startImpl(replica::Lock const& lock) {
auto const request = shared_from_base<RemoveReplicaQservMgtRequest>();

_qservRequest = wpublish::RemoveChunkGroupQservRequest::create(
_qservRequest = xrdreq::RemoveChunkGroupQservRequest::create(
chunk(), databases(), force(),
[request](wpublish::ChunkGroupQservRequest::Status status, string const& error) {
[request](xrdreq::ChunkGroupQservRequest::Status status, string const& error) {
if (request->state() == State::FINISHED) return;

replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]");

if (request->state() == State::FINISHED) return;

switch (status) {
case wpublish::ChunkGroupQservRequest::Status::SUCCESS:
case xrdreq::ChunkGroupQservRequest::Status::SUCCESS:
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
break;

case wpublish::ChunkGroupQservRequest::Status::INVALID:
case xrdreq::ChunkGroupQservRequest::Status::INVALID:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD, error);
break;

case wpublish::ChunkGroupQservRequest::Status::IN_USE:
case xrdreq::ChunkGroupQservRequest::Status::IN_USE:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_CHUNK_IN_USE, error);
break;

case wpublish::ChunkGroupQservRequest::Status::ERROR:
case xrdreq::ChunkGroupQservRequest::Status::ERROR:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;

default:
throw logic_error("RemoveReplicaQservMgtRequest::" + string(__func__) +
" unhandled server status: " +
wpublish::ChunkGroupQservRequest::status2str(status));
xrdreq::ChunkGroupQservRequest::status2str(status));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
Expand Down
4 changes: 2 additions & 2 deletions src/replica/RemoveReplicaQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
// Qserv headers
#include "replica/QservMgtRequest.h"
#include "replica/ServiceProvider.h"
#include "wpublish/ChunkGroupQservRequest.h"
#include "xrdreq/ChunkGroupQservRequest.h"

// This header declarations
namespace lsst::qserv::replica {
Expand Down Expand Up @@ -107,7 +107,7 @@ class RemoveReplicaQservMgtRequest : public QservMgtRequest {
CallbackType _onFinish;

/// A request to the remote services
wpublish::RemoveChunkGroupQservRequest::Ptr _qservRequest;
xrdreq::RemoveChunkGroupQservRequest::Ptr _qservRequest;
};

} // namespace lsst::qserv::replica
Expand Down
22 changes: 11 additions & 11 deletions src/replica/SetReplicasQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,54 +88,54 @@ list<pair<string, string>> SetReplicasQservMgtRequest::extendedPersistentState()
}

void SetReplicasQservMgtRequest::_setReplicas(
replica::Lock const& lock, wpublish::SetChunkListQservRequest::ChunkCollection const& collection) {
replica::Lock const& lock, xrdreq::SetChunkListQservRequest::ChunkCollection const& collection) {
_replicas.clear();
for (auto&& replica : collection) {
_replicas.push_back(QservReplica{replica.chunk, replica.database, replica.use_count});
}
}

void SetReplicasQservMgtRequest::startImpl(replica::Lock const& lock) {
wpublish::SetChunkListQservRequest::ChunkCollection chunks;
xrdreq::SetChunkListQservRequest::ChunkCollection chunks;
for (auto&& chunkEntry : newReplicas()) {
chunks.push_back(wpublish::SetChunkListQservRequest::Chunk{
chunks.push_back(xrdreq::SetChunkListQservRequest::Chunk{
chunkEntry.chunk, chunkEntry.database, 0 /* UNUSED: use_count */
});
}
auto const request = shared_from_base<SetReplicasQservMgtRequest>();

_qservRequest = wpublish::SetChunkListQservRequest::create(
_qservRequest = xrdreq::SetChunkListQservRequest::create(
chunks, _databases, force(),
[request](wpublish::SetChunkListQservRequest::Status status, string const& error,
wpublish::SetChunkListQservRequest::ChunkCollection const& collection) {
[request](xrdreq::SetChunkListQservRequest::Status status, string const& error,
xrdreq::SetChunkListQservRequest::ChunkCollection const& collection) {
if (request->state() == State::FINISHED) return;

replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]");

if (request->state() == State::FINISHED) return;

switch (status) {
case wpublish::SetChunkListQservRequest::Status::SUCCESS:
case xrdreq::SetChunkListQservRequest::Status::SUCCESS:
request->_setReplicas(lock, collection);
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
break;

case wpublish::SetChunkListQservRequest::Status::ERROR:
case xrdreq::SetChunkListQservRequest::Status::ERROR:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;

case wpublish::SetChunkListQservRequest::Status::INVALID:
case xrdreq::SetChunkListQservRequest::Status::INVALID:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD, error);
break;

case wpublish::SetChunkListQservRequest::Status::IN_USE:
case xrdreq::SetChunkListQservRequest::Status::IN_USE:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_CHUNK_IN_USE, error);
break;

default:
throw logic_error("SetReplicasQservMgtRequest:: " + string(__func__) +
" unhandled server status: " +
wpublish::SetChunkListQservRequest::status2str(status));
xrdreq::SetChunkListQservRequest::status2str(status));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
Expand Down
Loading

0 comments on commit 19d6029

Please sign in to comment.