Skip to content

Commit

Permalink
Eliminated the unused Controller's function of tracking duplicate req…
Browse files Browse the repository at this point in the history
…uests

This function that was provisioned in the original design of the Replication
System has shown no benefits. Besides, the "duplicate" requests can't be
reliably detected since the detection algorithm is time dependent.
  • Loading branch information
iagaponenko committed Dec 10, 2024
1 parent 064c5da commit 2ea3786
Show file tree
Hide file tree
Showing 24 changed files with 38 additions and 174 deletions.
10 changes: 2 additions & 8 deletions src/replica/apps/ControllerApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ void ControllerApp::_configureParser() {
_cancelDelayMilliseconds)
.option("priority", "The priority level of a request", _priority)
.flag("do-not-track", "Do not track requests by waiting before they finish.", _doNotTrackRequest)
.flag("allow-duplicates",
"Allow requests which duplicate the previously made one. This applies"
" to requests which change the replica disposition at a worker, and only"
" for those requests which are still in the worker's queues.",
_allowDuplicates)
.flag("do-not-save-replica",
"The flag which (if used) prevents the application from saving replica info in a database."
" This may significantly speed up the application in setups where the number of chunks is "
Expand Down Expand Up @@ -415,11 +410,10 @@ int ControllerApp::runImpl() {
request = ReplicationRequest::createAndStart(
controller, _workerName, _sourceWorkerName, _databaseName, _chunkNumber,
[](ReplicationRequest::Ptr const& request_) { request_->print(); }, _priority,
!_doNotTrackRequest, _allowDuplicates);
!_doNotTrackRequest);
} else if ("DELETE" == _requestType) {
request = DeleteRequest::createAndStart(controller, _workerName, _databaseName, _chunkNumber,
Request::defaultPrinter, _priority, !_doNotTrackRequest,
_allowDuplicates);
Request::defaultPrinter, _priority, !_doNotTrackRequest);
} else if ("FIND" == _requestType) {
request = FindRequest::createAndStart(controller, _workerName, _databaseName, _chunkNumber,
Request::defaultPrinter, _priority, _computeCheckSum,
Expand Down
5 changes: 0 additions & 5 deletions src/replica/apps/ControllerApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,6 @@ class ControllerApp : public Application {
/// Do not track requests waiting before they finish
bool _doNotTrackRequest = false;

/// Allow requests which duplicate the previously made one. This applies
/// to requests which change the replica disposition at a worker, and only
/// for those requests which are still in the worker's queues.
bool _allowDuplicates = false;

/// Do not save the replica info in the database if set to 'true'
bool _doNotSaveReplicaInfo = false;

Expand Down
3 changes: 1 addition & 2 deletions src/replica/jobs/CreateReplicaJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,13 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) {
// VERY IMPORTANT: the requests are sent for participating databases
// only because some catalogs may not have a full coverage
bool const keepTracking = true;
bool const allowDuplicate = true;
for (auto&& replica : sourceReplicas) {
_requests.push_back(ReplicationRequest::createAndStart(
controller(), destinationWorker(), sourceWorker(), replica.database(), chunk(),
[self = shared_from_base<CreateReplicaJob>()](ReplicationRequest::Ptr ptr) {
self->_onRequestFinish(ptr);
},
priority(), keepTracking, allowDuplicate, id()));
priority(), keepTracking, id()));
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/replica/jobs/DeleteReplicaJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,13 @@ void DeleteReplicaJob::_beginDeleteReplica(replica::Lock const& lock) {
// VERY IMPORTANT: the requests are sent for participating databases
// only because some catalogs may not have a full coverage
bool const keepTracking = true;
bool const allowDuplicate = true;
for (auto&& replica : _replicas) {
_requests.push_back(DeleteRequest::createAndStart(
controller(), workerName(), replica.database(), chunk(),
[self = shared_from_base<DeleteReplicaJob>()](DeleteRequest::Ptr ptr) {
self->_onRequestFinish(ptr);
},
priority(), keepTracking, allowDuplicate, id()));
priority(), keepTracking, id()));
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/replica/jobs/FixUpJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ size_t FixUpJob::_launchNext(replica::Lock const& lock, string const& destinatio
if (maxRequests == 0) return 0;
auto&& tasks = _destinationWorker2tasks[destinationWorker];
bool const keepTracking = true;
bool const allowDuplicate = true;
size_t numLaunched = 0;
for (size_t i = 0; i < maxRequests; ++i) {
if (tasks.size() == 0) break;
Expand All @@ -270,7 +269,7 @@ size_t FixUpJob::_launchNext(replica::Lock const& lock, string const& destinatio
[self = shared_from_base<FixUpJob>()](ReplicationRequest::Ptr ptr) {
self->_onRequestFinish(ptr);
},
priority(), keepTracking, allowDuplicate, id()));
priority(), keepTracking, id()));
tasks.pop();
numLaunched++;
}
Expand Down
7 changes: 0 additions & 7 deletions src/replica/proto/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ enum ProtocolStatusExt {
NONE = 0; // unspecified problem
INVALID_PARAM = 1; // invalid parameter(s) of a request
INVALID_ID = 2; // an invalid request identifier
DUPLICATE = 3; // a duplicate request
FOLDER_STAT = 4; // failed to obtain fstat() for a folder
FOLDER_CREATE = 5; // failed to create a folder
FILE_STAT = 6; // failed to obtain fstat() for a file
Expand Down Expand Up @@ -487,9 +486,6 @@ message ProtocolResponseReplicate {
/// Extended status of this operation
optional ProtocolStatusExt status_ext = 2 [default = NONE];

/// The field is set for duplicate requests only
optional string duplicate_request_id = 3 [default = ""];

/// The performance of this operation
required ProtocolPerformance performance = 4;

Expand Down Expand Up @@ -519,9 +515,6 @@ message ProtocolResponseDelete {
/// Extended status of this operation
optional ProtocolStatusExt status_ext = 2 [default = NONE];

/// The field is set for duplicate requests only
optional string duplicate_request_id = 3 [default = ""];

/// The performance of this operation
required ProtocolPerformance performance = 4;

Expand Down
23 changes: 7 additions & 16 deletions src/replica/requests/DeleteRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ namespace lsst::qserv::replica {
DeleteRequest::Ptr DeleteRequest::createAndStart(shared_ptr<Controller> const& controller,
string const& workerName, string const& database,
unsigned int chunk, CallbackType const& onFinish,
int priority, bool keepTracking, bool allowDuplicate,
string const& jobId, unsigned int requestExpirationIvalSec) {
auto ptr = DeleteRequest::Ptr(new DeleteRequest(controller, workerName, database, chunk, onFinish,
priority, keepTracking, allowDuplicate));
int priority, bool keepTracking, string const& jobId,
unsigned int requestExpirationIvalSec) {
auto ptr = DeleteRequest::Ptr(
new DeleteRequest(controller, workerName, database, chunk, onFinish, priority, keepTracking));
ptr->start(jobId, requestExpirationIvalSec);
return ptr;
}

DeleteRequest::DeleteRequest(shared_ptr<Controller> const& controller, string const& workerName,
string const& database, unsigned int chunk, CallbackType const& onFinish,
int priority, bool keepTracking, bool allowDuplicate)
: RequestMessenger(controller, "REPLICA_DELETE", workerName, priority, keepTracking, allowDuplicate,
int priority, bool keepTracking)
: RequestMessenger(controller, "REPLICA_DELETE", workerName, priority, keepTracking,
::disposeRequired),
_database(database),
_chunk(chunk),
Expand Down Expand Up @@ -117,7 +117,7 @@ void DeleteRequest::awaken(boost::system::error_code const& ec) {
buffer()->serialize(hdr);

ProtocolRequestTrack message;
message.set_id(remoteId());
message.set_id(id());
message.set_queued_type(ProtocolQueuedRequestType::REPLICA_DELETE);
buffer()->serialize(message);

Expand Down Expand Up @@ -194,15 +194,6 @@ void DeleteRequest::_analyze(bool success, ProtocolResponseDelete const& message
break;

case ProtocolStatus::BAD:
// Special treatment of the duplicate requests if allowed
if (extendedServerStatus() == ProtocolStatusExt::DUPLICATE) {
setDuplicateRequestId(lock, message.duplicate_request_id());
if (allowDuplicate() && keepTracking()) {
timer().expires_from_now(boost::posix_time::milliseconds(nextTimeIvalMsec()));
timer().async_wait(bind(&DeleteRequest::awaken, shared_from_base<DeleteRequest>(), _1));
return;
}
}
finish(lock, SERVER_BAD);
break;

Expand Down
6 changes: 3 additions & 3 deletions src/replica/requests/DeleteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class DeleteRequest : public RequestMessenger {
static Ptr createAndStart(std::shared_ptr<Controller> const& controller, std::string const& workerName,
std::string const& database, unsigned int chunk,
CallbackType const& onFinish = nullptr, int priority = PRIORITY_NORMAL,
bool keepTracking = true, bool allowDuplicate = true,
std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0);
bool keepTracking = true, std::string const& jobId = "",
unsigned int requestExpirationIvalSec = 0);

protected:
void startImpl(replica::Lock const& lock) final;
Expand All @@ -101,7 +101,7 @@ class DeleteRequest : public RequestMessenger {
private:
DeleteRequest(std::shared_ptr<Controller> const& controller, std::string const& workerName,
std::string const& database, unsigned int chunk, CallbackType const& onFinish, int priority,
bool keepTracking, bool allowDuplicate);
bool keepTracking);

/**
* Send the serialized content of the buffer to a worker.
Expand Down
4 changes: 1 addition & 3 deletions src/replica/requests/DirectorIndexRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ namespace fs = boost::filesystem;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.DirectorIndexRequest");
bool const allowDuplicateNo = false;
bool const disposeRequired = true;
} // namespace

Expand Down Expand Up @@ -77,8 +76,7 @@ DirectorIndexRequest::DirectorIndexRequest(std::shared_ptr<Controller> const& co
string const& directorTable, unsigned int chunk,
bool hasTransactions, TransactionId transactionId,
CallbackType const& onFinish, int priority, bool keepTracking)
: RequestMessenger(controller, "INDEX", workerName, priority, keepTracking, ::allowDuplicateNo,
::disposeRequired),
: RequestMessenger(controller, "INDEX", workerName, priority, keepTracking, ::disposeRequired),
_database(database),
_directorTable(directorTable),
_chunk(chunk),
Expand Down
4 changes: 1 addition & 3 deletions src/replica/requests/DisposeRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ using namespace std::placeholders;
namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.DisposeRequest");
bool const allowDuplicateNo = false;
bool const disposeRequiredNo = false;
} // namespace

Expand Down Expand Up @@ -83,8 +82,7 @@ DisposeRequest::Ptr DisposeRequest::createAndStart(shared_ptr<Controller> const&
DisposeRequest::DisposeRequest(shared_ptr<Controller> const& controller, string const& workerName,
std::vector<std::string> const& targetIds, CallbackType const& onFinish,
int priority, bool keepTracking)
: RequestMessenger(controller, "DISPOSE", workerName, priority, keepTracking, ::allowDuplicateNo,
::disposeRequiredNo),
: RequestMessenger(controller, "DISPOSE", workerName, priority, keepTracking, ::disposeRequiredNo),
_targetIds(targetIds),
_onFinish(onFinish) {}

Expand Down
4 changes: 1 addition & 3 deletions src/replica/requests/EchoRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ using namespace std::placeholders;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.EchoRequest");
bool const allowDuplicateNo = false;
bool const disposeRequired = true;
} // namespace

Expand All @@ -65,8 +64,7 @@ EchoRequest::Ptr EchoRequest::createAndStart(shared_ptr<Controller> const& contr
EchoRequest::EchoRequest(shared_ptr<Controller> const& controller, string const& workerName,
string const& data, uint64_t delay, CallbackType const& onFinish, int priority,
bool keepTracking)
: RequestMessenger(controller, "TEST_ECHO", workerName, priority, keepTracking, ::allowDuplicateNo,
::disposeRequired),
: RequestMessenger(controller, "TEST_ECHO", workerName, priority, keepTracking, ::disposeRequired),
_data(data),
_delay(delay),
_onFinish(onFinish) {}
Expand Down
3 changes: 1 addition & 2 deletions src/replica/requests/FindAllRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ using namespace std::placeholders;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.FindAllRequest");
bool const allowDuplicateNo = false;
bool const disposeRequired = true;
} // namespace

Expand All @@ -66,7 +65,7 @@ FindAllRequest::FindAllRequest(shared_ptr<Controller> const& controller, string
string const& database, bool saveReplicaInfo, CallbackType const& onFinish,
int priority, bool keepTracking)
: RequestMessenger(controller, "REPLICA_FIND_ALL", workerName, priority, keepTracking,
::allowDuplicateNo, ::disposeRequired),
::disposeRequired),
_database(database),
_saveReplicaInfo(saveReplicaInfo),
_onFinish(onFinish) {
Expand Down
4 changes: 1 addition & 3 deletions src/replica/requests/FindRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ using namespace std::placeholders;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.FindRequest");
bool const allowDuplicateNo = false;
bool const disposeRequired = true;
} // namespace

Expand All @@ -66,8 +65,7 @@ FindRequest::Ptr FindRequest::createAndStart(shared_ptr<Controller> const& contr
FindRequest::FindRequest(shared_ptr<Controller> const& controller, string const& workerName,
string const& database, unsigned int chunk, CallbackType const& onFinish,
int priority, bool computeCheckSum, bool keepTracking)
: RequestMessenger(controller, "REPLICA_FIND", workerName, priority, keepTracking, ::allowDuplicateNo,
::disposeRequired),
: RequestMessenger(controller, "REPLICA_FIND", workerName, priority, keepTracking, ::disposeRequired),
_database(database),
_chunk(chunk),
_computeCheckSum(computeCheckSum),
Expand Down
23 changes: 6 additions & 17 deletions src/replica/requests/ReplicationRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,18 @@ namespace lsst::qserv::replica {
ReplicationRequest::Ptr ReplicationRequest::createAndStart(
shared_ptr<Controller> const& controller, string const& workerName, string const& sourceWorkerName,
string const& database, unsigned int chunk, CallbackType const& onFinish, int priority,
bool keepTracking, bool allowDuplicate, string const& jobId, unsigned int requestExpirationIvalSec) {
auto ptr = ReplicationRequest::Ptr(new ReplicationRequest(controller, workerName, sourceWorkerName,
database, chunk, onFinish, priority,
keepTracking, allowDuplicate));
bool keepTracking, string const& jobId, unsigned int requestExpirationIvalSec) {
auto ptr = ReplicationRequest::Ptr(new ReplicationRequest(
controller, workerName, sourceWorkerName, database, chunk, onFinish, priority, keepTracking));
ptr->start(jobId, requestExpirationIvalSec);
return ptr;
}

ReplicationRequest::ReplicationRequest(shared_ptr<Controller> const& controller, string const& workerName,
string const& sourceWorkerName, string const& database,
unsigned int chunk, CallbackType const& onFinish, int priority,
bool keepTracking, bool allowDuplicate)
: RequestMessenger(controller, "REPLICA_CREATE", workerName, priority, keepTracking, allowDuplicate,
bool keepTracking)
: RequestMessenger(controller, "REPLICA_CREATE", workerName, priority, keepTracking,
::disposeRequired),
_database(database),
_chunk(chunk),
Expand Down Expand Up @@ -127,7 +126,7 @@ void ReplicationRequest::awaken(boost::system::error_code const& ec) {
buffer()->serialize(hdr);

ProtocolRequestTrack message;
message.set_id(remoteId());
message.set_id(id());
message.set_queued_type(ProtocolQueuedRequestType::REPLICA_CREATE);
buffer()->serialize(message);

Expand Down Expand Up @@ -197,16 +196,6 @@ void ReplicationRequest::_analyze(bool success, ProtocolResponseReplicate const&
keepTrackingOrFinish(lock, SERVER_IS_CANCELLING);
break;
case ProtocolStatus::BAD:
// Special treatment of the duplicate requests if allowed
if (extendedServerStatus() == ProtocolStatusExt::DUPLICATE) {
setDuplicateRequestId(lock, message.duplicate_request_id());
if (allowDuplicate() && keepTracking()) {
timer().expires_from_now(boost::posix_time::milliseconds(nextTimeIvalMsec()));
timer().async_wait(
bind(&ReplicationRequest::awaken, shared_from_base<ReplicationRequest>(), _1));
return;
}
}
finish(lock, SERVER_BAD);
break;
case ProtocolStatus::FAILED:
Expand Down
5 changes: 2 additions & 3 deletions src/replica/requests/ReplicationRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ class ReplicationRequest : public RequestMessenger {
std::string const& sourceWorkerName, std::string const& database,
unsigned int chunk, CallbackType const& onFinish = nullptr,
int priority = PRIORITY_NORMAL, bool keepTracking = true,
bool allowDuplicate = true, std::string const& jobId = "",
unsigned int requestExpirationIvalSec = 0);
std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0);

/// @see Request::extendedPersistentState()
std::list<std::pair<std::string, std::string>> extendedPersistentState() const override;
Expand All @@ -109,7 +108,7 @@ class ReplicationRequest : public RequestMessenger {
private:
ReplicationRequest(std::shared_ptr<Controller> const& controller, std::string const& workerName,
std::string const& sourceWorkerName, std::string const& database, unsigned int chunk,
CallbackType const& onFinish, int priority, bool keepTracking, bool allowDuplicate);
CallbackType const& onFinish, int priority, bool keepTracking);

/**
* Send the serialized content of the buffer to a worker
Expand Down
Loading

0 comments on commit 2ea3786

Please sign in to comment.