From ea3474fe989367c06f94d50489cda9fb91576031 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 6 Dec 2024 12:12:12 -0800 Subject: [PATCH] Eliminated the unused Controller's function of tracking duplicate requests This function that was provisioned in the original design of the Replication System has shown any benefits. --- src/replica/apps/ControllerApp.cc | 10 +--- src/replica/apps/ControllerApp.h | 5 -- src/replica/jobs/CreateReplicaJob.cc | 3 +- src/replica/jobs/DeleteReplicaJob.cc | 3 +- src/replica/jobs/FixUpJob.cc | 3 +- src/replica/proto/protocol.proto | 7 --- src/replica/requests/DeleteRequest.cc | 21 +++------ src/replica/requests/DeleteRequest.h | 6 +-- src/replica/requests/DirectorIndexRequest.cc | 4 +- src/replica/requests/DisposeRequest.cc | 4 +- src/replica/requests/EchoRequest.cc | 4 +- src/replica/requests/FindAllRequest.cc | 3 +- src/replica/requests/FindRequest.cc | 4 +- src/replica/requests/ReplicationRequest.cc | 21 ++------- src/replica/requests/ReplicationRequest.h | 5 +- src/replica/requests/Request.cc | 4 +- src/replica/requests/Request.h | 19 ++------ src/replica/requests/RequestMessenger.cc | 4 +- src/replica/requests/RequestMessenger.h | 3 +- .../requests/ServiceManagementRequestBase.cc | 3 +- src/replica/requests/SqlRequest.cc | 4 +- src/replica/requests/StatusRequest.cc | 3 +- src/replica/requests/StopRequest.cc | 3 +- src/replica/worker/WorkerProcessor.cc | 46 ------------------- 24 files changed, 37 insertions(+), 155 deletions(-) diff --git a/src/replica/apps/ControllerApp.cc b/src/replica/apps/ControllerApp.cc index 01c82a53d..67d3b0017 100644 --- a/src/replica/apps/ControllerApp.cc +++ b/src/replica/apps/ControllerApp.cc @@ -129,11 +129,6 @@ void ControllerApp::_configureParser() { _cancelDelayMilliseconds) .option("priority", "The priority level of a request", _priority) .flag("do-not-track", "Do not track requests by waiting before they finish.", _doNotTrackRequest) - .flag("allow-duplicates", - "Allow requests which duplicate the previously made one. This applies" - " to requests which change the replica disposition at a worker, and only" - " for those requests which are still in the worker's queues.", - _allowDuplicates) .flag("do-not-save-replica", "The flag which (if used) prevents the application from saving replica info in a database." " This may significantly speed up the application in setups where the number of chunks is " @@ -415,11 +410,10 @@ int ControllerApp::runImpl() { request = ReplicationRequest::createAndStart( controller, _workerName, _sourceWorkerName, _databaseName, _chunkNumber, [](ReplicationRequest::Ptr const& request_) { request_->print(); }, _priority, - !_doNotTrackRequest, _allowDuplicates); + !_doNotTrackRequest); } else if ("DELETE" == _requestType) { request = DeleteRequest::createAndStart(controller, _workerName, _databaseName, _chunkNumber, - Request::defaultPrinter, _priority, !_doNotTrackRequest, - _allowDuplicates); + Request::defaultPrinter, _priority, !_doNotTrackRequest); } else if ("FIND" == _requestType) { request = FindRequest::createAndStart(controller, _workerName, _databaseName, _chunkNumber, Request::defaultPrinter, _priority, _computeCheckSum, diff --git a/src/replica/apps/ControllerApp.h b/src/replica/apps/ControllerApp.h index 5b6a100a0..8fc349d4b 100644 --- a/src/replica/apps/ControllerApp.h +++ b/src/replica/apps/ControllerApp.h @@ -171,11 +171,6 @@ class ControllerApp : public Application { /// Do not track requests waiting before they finish bool _doNotTrackRequest = false; - /// Allow requests which duplicate the previously made one. This applies - /// to requests which change the replica disposition at a worker, and only - /// for those requests which are still in the worker's queues. - bool _allowDuplicates = false; - /// Do not save the replica info in the database if set to 'true' bool _doNotSaveReplicaInfo = false; diff --git a/src/replica/jobs/CreateReplicaJob.cc b/src/replica/jobs/CreateReplicaJob.cc index 94a393ca8..cf6c8e5c9 100644 --- a/src/replica/jobs/CreateReplicaJob.cc +++ b/src/replica/jobs/CreateReplicaJob.cc @@ -203,14 +203,13 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) { // VERY IMPORTANT: the requests are sent for participating databases // only because some catalogs may not have a full coverage bool const keepTracking = true; - bool const allowDuplicate = true; for (auto&& replica : sourceReplicas) { _requests.push_back(ReplicationRequest::createAndStart( controller(), destinationWorker(), sourceWorker(), replica.database(), chunk(), [self = shared_from_base()](ReplicationRequest::Ptr ptr) { self->_onRequestFinish(ptr); }, - priority(), keepTracking, allowDuplicate, id())); + priority(), keepTracking, id())); } } diff --git a/src/replica/jobs/DeleteReplicaJob.cc b/src/replica/jobs/DeleteReplicaJob.cc index b9c288727..b8ef4d7f6 100644 --- a/src/replica/jobs/DeleteReplicaJob.cc +++ b/src/replica/jobs/DeleteReplicaJob.cc @@ -235,14 +235,13 @@ void DeleteReplicaJob::_beginDeleteReplica(replica::Lock const& lock) { // VERY IMPORTANT: the requests are sent for participating databases // only because some catalogs may not have a full coverage bool const keepTracking = true; - bool const allowDuplicate = true; for (auto&& replica : _replicas) { _requests.push_back(DeleteRequest::createAndStart( controller(), workerName(), replica.database(), chunk(), [self = shared_from_base()](DeleteRequest::Ptr ptr) { self->_onRequestFinish(ptr); }, - priority(), keepTracking, allowDuplicate, id())); + priority(), keepTracking, id())); } } diff --git a/src/replica/jobs/FixUpJob.cc b/src/replica/jobs/FixUpJob.cc index 8aa67874e..5393418c8 100644 --- a/src/replica/jobs/FixUpJob.cc +++ b/src/replica/jobs/FixUpJob.cc @@ -257,7 +257,6 @@ size_t FixUpJob::_launchNext(replica::Lock const& lock, string const& destinatio if (maxRequests == 0) return 0; auto&& tasks = _destinationWorker2tasks[destinationWorker]; bool const keepTracking = true; - bool const allowDuplicate = true; size_t numLaunched = 0; for (size_t i = 0; i < maxRequests; ++i) { if (tasks.size() == 0) break; @@ -270,7 +269,7 @@ size_t FixUpJob::_launchNext(replica::Lock const& lock, string const& destinatio [self = shared_from_base()](ReplicationRequest::Ptr ptr) { self->_onRequestFinish(ptr); }, - priority(), keepTracking, allowDuplicate, id())); + priority(), keepTracking, id())); tasks.pop(); numLaunched++; } diff --git a/src/replica/proto/protocol.proto b/src/replica/proto/protocol.proto index b50558de4..5e2fd54dd 100644 --- a/src/replica/proto/protocol.proto +++ b/src/replica/proto/protocol.proto @@ -399,7 +399,6 @@ enum ProtocolStatusExt { NONE = 0; // unspecified problem INVALID_PARAM = 1; // invalid parameter(s) of a request INVALID_ID = 2; // an invalid request identifier - DUPLICATE = 3; // a duplicate request FOLDER_STAT = 4; // failed to obtain fstat() for a folder FOLDER_CREATE = 5; // failed to create a folder FILE_STAT = 6; // failed to obtain fstat() for a file @@ -487,9 +486,6 @@ message ProtocolResponseReplicate { /// Extended status of this operation optional ProtocolStatusExt status_ext = 2 [default = NONE]; - /// The field is set for duplicate requests only - optional string duplicate_request_id = 3 [default = ""]; - /// The performance of this operation required ProtocolPerformance performance = 4; @@ -519,9 +515,6 @@ message ProtocolResponseDelete { /// Extended status of this operation optional ProtocolStatusExt status_ext = 2 [default = NONE]; - /// The field is set for duplicate requests only - optional string duplicate_request_id = 3 [default = ""]; - /// The performance of this operation required ProtocolPerformance performance = 4; diff --git a/src/replica/requests/DeleteRequest.cc b/src/replica/requests/DeleteRequest.cc index d8f6163a6..1852a33a2 100644 --- a/src/replica/requests/DeleteRequest.cc +++ b/src/replica/requests/DeleteRequest.cc @@ -53,18 +53,18 @@ namespace lsst::qserv::replica { DeleteRequest::Ptr DeleteRequest::createAndStart(shared_ptr const& controller, string const& workerName, string const& database, unsigned int chunk, CallbackType const& onFinish, - int priority, bool keepTracking, bool allowDuplicate, - string const& jobId, unsigned int requestExpirationIvalSec) { - auto ptr = DeleteRequest::Ptr(new DeleteRequest(controller, workerName, database, chunk, onFinish, - priority, keepTracking, allowDuplicate)); + int priority, bool keepTracking, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = DeleteRequest::Ptr( + new DeleteRequest(controller, workerName, database, chunk, onFinish, priority, keepTracking)); ptr->start(jobId, requestExpirationIvalSec); return ptr; } DeleteRequest::DeleteRequest(shared_ptr const& controller, string const& workerName, string const& database, unsigned int chunk, CallbackType const& onFinish, - int priority, bool keepTracking, bool allowDuplicate) - : RequestMessenger(controller, "REPLICA_DELETE", workerName, priority, keepTracking, allowDuplicate, + int priority, bool keepTracking) + : RequestMessenger(controller, "REPLICA_DELETE", workerName, priority, keepTracking, ::disposeRequired), _database(database), _chunk(chunk), @@ -194,15 +194,6 @@ void DeleteRequest::_analyze(bool success, ProtocolResponseDelete const& message break; case ProtocolStatus::BAD: - // Special treatment of the duplicate requests if allowed - if (extendedServerStatus() == ProtocolStatusExt::DUPLICATE) { - setDuplicateRequestId(lock, message.duplicate_request_id()); - if (allowDuplicate() && keepTracking()) { - timer().expires_from_now(boost::posix_time::milliseconds(nextTimeIvalMsec())); - timer().async_wait(bind(&DeleteRequest::awaken, shared_from_base(), _1)); - return; - } - } finish(lock, SERVER_BAD); break; diff --git a/src/replica/requests/DeleteRequest.h b/src/replica/requests/DeleteRequest.h index e705c27f0..bab621f17 100644 --- a/src/replica/requests/DeleteRequest.h +++ b/src/replica/requests/DeleteRequest.h @@ -88,8 +88,8 @@ class DeleteRequest : public RequestMessenger { static Ptr createAndStart(std::shared_ptr const& controller, std::string const& workerName, std::string const& database, unsigned int chunk, CallbackType const& onFinish = nullptr, int priority = PRIORITY_NORMAL, - bool keepTracking = true, bool allowDuplicate = true, - std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); + bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: void startImpl(replica::Lock const& lock) final; @@ -101,7 +101,7 @@ class DeleteRequest : public RequestMessenger { private: DeleteRequest(std::shared_ptr const& controller, std::string const& workerName, std::string const& database, unsigned int chunk, CallbackType const& onFinish, int priority, - bool keepTracking, bool allowDuplicate); + bool keepTracking); /** * Send the serialized content of the buffer to a worker. diff --git a/src/replica/requests/DirectorIndexRequest.cc b/src/replica/requests/DirectorIndexRequest.cc index 0b469810f..bc1b144ed 100644 --- a/src/replica/requests/DirectorIndexRequest.cc +++ b/src/replica/requests/DirectorIndexRequest.cc @@ -48,7 +48,6 @@ namespace fs = boost::filesystem; namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.DirectorIndexRequest"); -bool const allowDuplicateNo = false; bool const disposeRequired = true; } // namespace @@ -77,8 +76,7 @@ DirectorIndexRequest::DirectorIndexRequest(std::shared_ptr const& co string const& directorTable, unsigned int chunk, bool hasTransactions, TransactionId transactionId, CallbackType const& onFinish, int priority, bool keepTracking) - : RequestMessenger(controller, "INDEX", workerName, priority, keepTracking, ::allowDuplicateNo, - ::disposeRequired), + : RequestMessenger(controller, "INDEX", workerName, priority, keepTracking, ::disposeRequired), _database(database), _directorTable(directorTable), _chunk(chunk), diff --git a/src/replica/requests/DisposeRequest.cc b/src/replica/requests/DisposeRequest.cc index 699707158..0f57b28d7 100644 --- a/src/replica/requests/DisposeRequest.cc +++ b/src/replica/requests/DisposeRequest.cc @@ -41,7 +41,6 @@ using namespace std::placeholders; namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.DisposeRequest"); -bool const allowDuplicateNo = false; bool const disposeRequiredNo = false; } // namespace @@ -83,8 +82,7 @@ DisposeRequest::Ptr DisposeRequest::createAndStart(shared_ptr const& DisposeRequest::DisposeRequest(shared_ptr const& controller, string const& workerName, std::vector const& targetIds, CallbackType const& onFinish, int priority, bool keepTracking) - : RequestMessenger(controller, "DISPOSE", workerName, priority, keepTracking, ::allowDuplicateNo, - ::disposeRequiredNo), + : RequestMessenger(controller, "DISPOSE", workerName, priority, keepTracking, ::disposeRequiredNo), _targetIds(targetIds), _onFinish(onFinish) {} diff --git a/src/replica/requests/EchoRequest.cc b/src/replica/requests/EchoRequest.cc index d713128e6..a06644543 100644 --- a/src/replica/requests/EchoRequest.cc +++ b/src/replica/requests/EchoRequest.cc @@ -45,7 +45,6 @@ using namespace std::placeholders; namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.EchoRequest"); -bool const allowDuplicateNo = false; bool const disposeRequired = true; } // namespace @@ -65,8 +64,7 @@ EchoRequest::Ptr EchoRequest::createAndStart(shared_ptr const& contr EchoRequest::EchoRequest(shared_ptr const& controller, string const& workerName, string const& data, uint64_t delay, CallbackType const& onFinish, int priority, bool keepTracking) - : RequestMessenger(controller, "TEST_ECHO", workerName, priority, keepTracking, ::allowDuplicateNo, - ::disposeRequired), + : RequestMessenger(controller, "TEST_ECHO", workerName, priority, keepTracking, ::disposeRequired), _data(data), _delay(delay), _onFinish(onFinish) {} diff --git a/src/replica/requests/FindAllRequest.cc b/src/replica/requests/FindAllRequest.cc index c3f501fda..b4fa178fe 100644 --- a/src/replica/requests/FindAllRequest.cc +++ b/src/replica/requests/FindAllRequest.cc @@ -45,7 +45,6 @@ using namespace std::placeholders; namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.FindAllRequest"); -bool const allowDuplicateNo = false; bool const disposeRequired = true; } // namespace @@ -66,7 +65,7 @@ FindAllRequest::FindAllRequest(shared_ptr const& controller, string string const& database, bool saveReplicaInfo, CallbackType const& onFinish, int priority, bool keepTracking) : RequestMessenger(controller, "REPLICA_FIND_ALL", workerName, priority, keepTracking, - ::allowDuplicateNo, ::disposeRequired), + ::disposeRequired), _database(database), _saveReplicaInfo(saveReplicaInfo), _onFinish(onFinish) { diff --git a/src/replica/requests/FindRequest.cc b/src/replica/requests/FindRequest.cc index fc6d25547..4a23224d8 100644 --- a/src/replica/requests/FindRequest.cc +++ b/src/replica/requests/FindRequest.cc @@ -46,7 +46,6 @@ using namespace std::placeholders; namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.FindRequest"); -bool const allowDuplicateNo = false; bool const disposeRequired = true; } // namespace @@ -66,8 +65,7 @@ FindRequest::Ptr FindRequest::createAndStart(shared_ptr const& contr FindRequest::FindRequest(shared_ptr const& controller, string const& workerName, string const& database, unsigned int chunk, CallbackType const& onFinish, int priority, bool computeCheckSum, bool keepTracking) - : RequestMessenger(controller, "REPLICA_FIND", workerName, priority, keepTracking, ::allowDuplicateNo, - ::disposeRequired), + : RequestMessenger(controller, "REPLICA_FIND", workerName, priority, keepTracking, ::disposeRequired), _database(database), _chunk(chunk), _computeCheckSum(computeCheckSum), diff --git a/src/replica/requests/ReplicationRequest.cc b/src/replica/requests/ReplicationRequest.cc index f44d8e436..0fd679b8e 100644 --- a/src/replica/requests/ReplicationRequest.cc +++ b/src/replica/requests/ReplicationRequest.cc @@ -53,10 +53,9 @@ namespace lsst::qserv::replica { ReplicationRequest::Ptr ReplicationRequest::createAndStart( shared_ptr const& controller, string const& workerName, string const& sourceWorkerName, string const& database, unsigned int chunk, CallbackType const& onFinish, int priority, - bool keepTracking, bool allowDuplicate, string const& jobId, unsigned int requestExpirationIvalSec) { - auto ptr = ReplicationRequest::Ptr(new ReplicationRequest(controller, workerName, sourceWorkerName, - database, chunk, onFinish, priority, - keepTracking, allowDuplicate)); + bool keepTracking, string const& jobId, unsigned int requestExpirationIvalSec) { + auto ptr = ReplicationRequest::Ptr(new ReplicationRequest( + controller, workerName, sourceWorkerName, database, chunk, onFinish, priority, keepTracking)); ptr->start(jobId, requestExpirationIvalSec); return ptr; } @@ -64,8 +63,8 @@ ReplicationRequest::Ptr ReplicationRequest::createAndStart( ReplicationRequest::ReplicationRequest(shared_ptr const& controller, string const& workerName, string const& sourceWorkerName, string const& database, unsigned int chunk, CallbackType const& onFinish, int priority, - bool keepTracking, bool allowDuplicate) - : RequestMessenger(controller, "REPLICA_CREATE", workerName, priority, keepTracking, allowDuplicate, + bool keepTracking) + : RequestMessenger(controller, "REPLICA_CREATE", workerName, priority, keepTracking, ::disposeRequired), _database(database), _chunk(chunk), @@ -197,16 +196,6 @@ void ReplicationRequest::_analyze(bool success, ProtocolResponseReplicate const& keepTrackingOrFinish(lock, SERVER_IS_CANCELLING); break; case ProtocolStatus::BAD: - // Special treatment of the duplicate requests if allowed - if (extendedServerStatus() == ProtocolStatusExt::DUPLICATE) { - setDuplicateRequestId(lock, message.duplicate_request_id()); - if (allowDuplicate() && keepTracking()) { - timer().expires_from_now(boost::posix_time::milliseconds(nextTimeIvalMsec())); - timer().async_wait( - bind(&ReplicationRequest::awaken, shared_from_base(), _1)); - return; - } - } finish(lock, SERVER_BAD); break; case ProtocolStatus::FAILED: diff --git a/src/replica/requests/ReplicationRequest.h b/src/replica/requests/ReplicationRequest.h index 33943965f..74c08c604 100644 --- a/src/replica/requests/ReplicationRequest.h +++ b/src/replica/requests/ReplicationRequest.h @@ -94,8 +94,7 @@ class ReplicationRequest : public RequestMessenger { std::string const& sourceWorkerName, std::string const& database, unsigned int chunk, CallbackType const& onFinish = nullptr, int priority = PRIORITY_NORMAL, bool keepTracking = true, - bool allowDuplicate = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); + std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); /// @see Request::extendedPersistentState() std::list> extendedPersistentState() const override; @@ -109,7 +108,7 @@ class ReplicationRequest : public RequestMessenger { private: ReplicationRequest(std::shared_ptr const& controller, std::string const& workerName, std::string const& sourceWorkerName, std::string const& database, unsigned int chunk, - CallbackType const& onFinish, int priority, bool keepTracking, bool allowDuplicate); + CallbackType const& onFinish, int priority, bool keepTracking); /** * Send the serialized content of the buffer to a worker diff --git a/src/replica/requests/Request.cc b/src/replica/requests/Request.cc index 4f6c2a617..267606d45 100644 --- a/src/replica/requests/Request.cc +++ b/src/replica/requests/Request.cc @@ -105,14 +105,13 @@ string Request::state2string(State state, ExtendedState extendedState, } Request::Request(shared_ptr const& controller, string const& type, string const& workerName, - int priority, bool keepTracking, bool allowDuplicate, bool disposeRequired) + int priority, bool keepTracking, bool disposeRequired) : _controller(controller), _type(type), _id(Generators::uniqueId()), _workerName(workerName), _priority(priority), _keepTracking(keepTracking), - _allowDuplicate(allowDuplicate), _disposeRequired(disposeRequired), _state(CREATED), _extendedState(NONE), @@ -170,7 +169,6 @@ string Request::toString(bool extended) const { << " worker: " << workerName() << "\n" << " priority: " << priority() << "\n" << " keepTracking: " << bool2str(keepTracking()) << "\n" - << " allowDuplicate: " << bool2str(allowDuplicate()) << "\n" << " disposeRequired: " << bool2str(disposeRequired()) << "\n" << " remoteId: " << remoteId() << "\n" << " performance: " << performance() << "\n"; diff --git a/src/replica/requests/Request.h b/src/replica/requests/Request.h index 3f66b4ece..50ade7f5f 100644 --- a/src/replica/requests/Request.h +++ b/src/replica/requests/Request.h @@ -62,7 +62,6 @@ namespace lsst::qserv::replica { * the request. The functin type is specific for each subclass. * @param priority The (optional) priority level of the request. * @param keepTracking The (optional) flagg to keep tracking the request before it finishes or fails. - * @param allowDuplicate (optional) Follow a previously made request if the current one duplicates it. * @param jobId The (optional) unique identifier of a job to which the request belongs. * @param requestExpirationIvalSec The (optional) time in seconds after which the request * will expire. The default value of '0' means an effective expiration time will be pull @@ -162,12 +161,7 @@ class Request : public std::enable_shared_from_this { /// @return a unique identifier of the request std::string const& id() const { return _id; } - /** - * Normally this is the same request as the one a request object is created with - * unless allowing to track duplicate requests (see constructor's options: 'keepTracking' - * and 'allowDuplicate') and after the one is found. - * @return an effective identifier of a remote (worker-side) request. - */ + /// @return an effective identifier of a remote (worker-side) request std::string const& remoteId() const; /// @return the priority level of the request @@ -264,7 +258,7 @@ class Request : public std::enable_shared_from_this { /** * Construct the request with the pointer to the services provider. * - * @note options 'keepTracking', 'allowDuplicate' and 'disposeRequired' + * @note options 'keepTracking' and 'disposeRequired' * have effect for specific request only. * * @param controller The Controller associated with the request. @@ -275,15 +269,12 @@ class Request : public std::enable_shared_from_this { * the request by the worker service. It may also affect an order requests * are processed locally. Higher number means higher priority. * @param keepTracking Keep tracking the request before it finishes or fails - * @param allowDuplicate Follow a previously made request if the current one - * duplicates it. * @param disposeRequired The flag indicating of the worker-side request * disposal is needed for a particular request. Normally, it's required for * requests which are queued by workers in its processing queues. */ Request(std::shared_ptr const& controller, std::string const& type, - std::string const& workerName, int priority, bool keepTracking, bool allowDuplicate, - bool disposeRequired); + std::string const& workerName, int priority, bool keepTracking, bool disposeRequired); /// @return A shared pointer of the desired subclass (no dynamic type checking) template @@ -311,9 +302,6 @@ class Request : public std::enable_shared_from_this { /// @return If 'true' then track request completion (queued requests only) bool keepTracking() const { return _keepTracking; } - /// @return If 'true' then follow a previously made request if the current one duplicates it. - bool allowDuplicate() const { return _allowDuplicate; } - /// @return If 'true' the request needs to be disposed at the worker's side upon /// a completion of an operation. bool disposeRequired() const { return _disposeRequired; } @@ -535,7 +523,6 @@ class Request : public std::enable_shared_from_this { int const _priority; bool const _keepTracking; - bool const _allowDuplicate; bool const _disposeRequired; /// An effective identifier of a remote (worker-side) request where diff --git a/src/replica/requests/RequestMessenger.cc b/src/replica/requests/RequestMessenger.cc index 032f68b36..cfb4e5e8a 100644 --- a/src/replica/requests/RequestMessenger.cc +++ b/src/replica/requests/RequestMessenger.cc @@ -44,8 +44,8 @@ namespace lsst::qserv::replica { RequestMessenger::RequestMessenger(shared_ptr const& controller, string const& type, string const& workerName, int priority, bool keepTracking, - bool allowDuplicate, bool disposeRequired) - : Request(controller, type, workerName, priority, keepTracking, allowDuplicate, disposeRequired) {} + bool disposeRequired) + : Request(controller, type, workerName, priority, keepTracking, disposeRequired) {} void RequestMessenger::finishImpl(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); diff --git a/src/replica/requests/RequestMessenger.h b/src/replica/requests/RequestMessenger.h index 13802907d..e91996c1d 100644 --- a/src/replica/requests/RequestMessenger.h +++ b/src/replica/requests/RequestMessenger.h @@ -70,8 +70,7 @@ class RequestMessenger : public Request { * @return A pointer to the created object. */ RequestMessenger(std::shared_ptr const& controller, std::string const& type, - std::string const& workerName, int priority, bool keepTracking, bool allowDuplicate, - bool disposeRequired); + std::string const& workerName, int priority, bool keepTracking, bool disposeRequired); /// @see Request::finishImpl() void finishImpl(replica::Lock const& lock) override; diff --git a/src/replica/requests/ServiceManagementRequestBase.cc b/src/replica/requests/ServiceManagementRequestBase.cc index 2919899bf..f56a757b3 100644 --- a/src/replica/requests/ServiceManagementRequestBase.cc +++ b/src/replica/requests/ServiceManagementRequestBase.cc @@ -57,7 +57,6 @@ void dumpRequestInfo(ostream& os, vector const& req } bool const keepTrackingNo = false; -bool const allowDuplicateNo = false; bool const disposeRequiredNo = false; } // namespace @@ -170,7 +169,7 @@ ServiceManagementRequestBase::ServiceManagementRequestBase(shared_ptr const& controller, std::string const& requestName, string const& workerName, uint64_t maxRows, int priority, bool keepTracking) - : RequestMessenger(controller, requestName, workerName, priority, keepTracking, ::allowDuplicateNo, - ::disposeRequired) { + : RequestMessenger(controller, requestName, workerName, priority, keepTracking, ::disposeRequired) { // Partial initialization of the request body's content. Other members // will be set in the request type-specific subclasses. requestBody.set_max_rows(maxRows); diff --git a/src/replica/requests/StatusRequest.cc b/src/replica/requests/StatusRequest.cc index 237ef3716..1826e8c6a 100644 --- a/src/replica/requests/StatusRequest.cc +++ b/src/replica/requests/StatusRequest.cc @@ -39,7 +39,6 @@ using namespace std; namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.StatusRequest"); -bool const allowDuplicateNo = false; bool const disposeRequiredNo = false; } // namespace @@ -60,7 +59,7 @@ StatusRequest::StatusRequest(shared_ptr const& controller, string co string const& targetRequestId, CallbackType const& onFinish, int priority, bool keepTracking) : RequestMessenger(controller, "REQUEST_STATUS", workerName, priority, keepTracking, - ::allowDuplicateNo, ::disposeRequiredNo), + ::disposeRequiredNo), _targetRequestId(targetRequestId), _onFinish(onFinish) {} diff --git a/src/replica/requests/StopRequest.cc b/src/replica/requests/StopRequest.cc index 21dadc1e2..4f1bfd3a3 100644 --- a/src/replica/requests/StopRequest.cc +++ b/src/replica/requests/StopRequest.cc @@ -36,7 +36,6 @@ using namespace std; namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.StopRequest"); -bool const allowDuplicateNo = false; bool const disposeRequiredNo = false; } // namespace @@ -55,7 +54,7 @@ StopRequest::Ptr StopRequest::createAndStart(shared_ptr const& contr StopRequest::StopRequest(shared_ptr const& controller, string const& workerName, string const& targetRequestId, CallbackType const& onFinish, int priority, bool keepTracking) - : RequestMessenger(controller, "REQUEST_STOP", workerName, priority, keepTracking, ::allowDuplicateNo, + : RequestMessenger(controller, "REQUEST_STOP", workerName, priority, keepTracking, ::disposeRequiredNo), _targetRequestId(targetRequestId), _onFinish(onFinish) {} diff --git a/src/replica/worker/WorkerProcessor.cc b/src/replica/worker/WorkerProcessor.cc index d1ddca60f..bbae7a627 100644 --- a/src/replica/worker/WorkerProcessor.cc +++ b/src/replica/worker/WorkerProcessor.cc @@ -49,31 +49,7 @@ using namespace std::placeholders; using namespace lsst::qserv::replica; namespace { - LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerProcessor"); - -template -bool ifDuplicateRequest(PROTOCOL_RESPONSE_TYPE& response, WorkerRequest::Ptr const& p, - PROTOCOL_REQUEST_TYPE const& request) { - bool isDuplicate = false; - - auto const ptr = dynamic_pointer_cast(p); - if (nullptr != ptr) { - isDuplicate = (ptr->database() == request.database()) and (ptr->chunk() == request.chunk()); - - } else { - auto const ptr = dynamic_pointer_cast(p); - if (nullptr != ptr) { - isDuplicate = (ptr->database() == request.database()) and (ptr->chunk() == request.chunk()); - } - } - if (isDuplicate) { - WorkerProcessor::setDefaultResponse(response, ProtocolStatus::BAD, ProtocolStatusExt::DUPLICATE); - response.set_duplicate_request_id(p->id()); - } - return isDuplicate; -} - } // namespace namespace lsst::qserv::replica { @@ -179,17 +155,6 @@ void WorkerProcessor::enqueueForReplication(string const& id, int32_t priority, replica::Lock lock(_mtx, _context(__func__)); - // Verify a scope of the request to ensure it won't duplicate or interfere (with) - // existing requests in the active (non-completed) queues. A reason why we're ignoring - // the completed is that this replica may have already been deleted from this worker. - - for (auto&& ptr : _newRequests) { - if (::ifDuplicateRequest(response, ptr, request)) return; - } - for (auto&& entry : _inProgressRequests) { - if (::ifDuplicateRequest(response, entry.second, request)) return; - } - // The code below may catch exceptions if other parameters of the request // won't pass further validation against the present configuration of the request // processing service. @@ -219,17 +184,6 @@ void WorkerProcessor::enqueueForDeletion(string const& id, int32_t priority, replica::Lock lock(_mtx, _context(__func__)); - // Verify a scope of the request to ensure it won't duplicate or interfere (with) - // existing requests in the active (non-completed) queues. A reason why we're ignoring - // the completed is that this replica may have already been deleted from this worker. - - for (auto&& ptr : _newRequests) { - if (::ifDuplicateRequest(response, ptr, request)) return; - } - for (auto&& entry : _inProgressRequests) { - if (::ifDuplicateRequest(response, entry.second, request)) return; - } - // The code below may catch exceptions if other parameters of the request // won't pass further validation against the present configuration of the request // processing service.