Skip to content

Commit

Permalink
Master Controller's option to manage replica synchroization at Qserv …
Browse files Browse the repository at this point in the history
…workers
  • Loading branch information
iagaponenko committed Dec 5, 2024
1 parent 0e0aa56 commit 80bdb86
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 13 deletions.
6 changes: 5 additions & 1 deletion src/replica/apps/MasterControllerHttpApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct {
unsigned int const workerReconfigTimeoutSec = 600;

bool const purge = false;
bool const allowQservSync = true;
bool const forceQservSync = false;
bool const permanentDelete = false;

Expand Down Expand Up @@ -111,6 +112,7 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[])
_qservSyncTimeoutSec(::defaultOptions.qservSyncTimeoutSec),
_workerReconfigTimeoutSec(::defaultOptions.workerReconfigTimeoutSec),
_purge(::defaultOptions.purge),
_allowQservSync(::defaultOptions.allowQservSync),
_forceQservSync(::defaultOptions.forceQservSync),
_permanentDelete(::defaultOptions.permanentDelete),
_qservCzarDbUrl(Configuration::qservCzarDbUrl()) {
Expand Down Expand Up @@ -155,6 +157,8 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[])
" would override the corresponding parameter specified"
" in the Configuration.",
_workerReconfigTimeoutSec);
parser().flag("allow-qserv-sync", "The flag which would allow replica synchroization at Qserv workers.",
_allowQservSync);
parser().flag("qserv-sync-force",
"The flag which would force Qserv workers to update their list of replicas"
" even if some of the chunk replicas were still in use by on-going queries."
Expand Down Expand Up @@ -212,7 +216,7 @@ int MasterControllerHttpApp::runImpl() {

_replicationTask = ReplicationTask::create(
_controller, [self](Task::Ptr const& ptr) { self->_isFailed.fail(); }, _qservSyncTimeoutSec,
_forceQservSync, _replicationIntervalSec, _purge);
_allowQservSync, _forceQservSync, _replicationIntervalSec, _purge);
_replicationTask->start();

_healthMonitorTask = HealthMonitorTask::create(
Expand Down
1 change: 1 addition & 0 deletions src/replica/apps/MasterControllerHttpApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class MasterControllerHttpApp : public Application {
unsigned int _workerReconfigTimeoutSec;

bool _purge;
bool _allowQservSync;
bool _forceQservSync;
bool _permanentDelete;

Expand Down
22 changes: 12 additions & 10 deletions src/replica/contr/ReplicationTask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ namespace lsst::qserv::replica {

ReplicationTask::Ptr ReplicationTask::create(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge) {
return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, forceQservSync,
replicationIntervalSec, purge));
unsigned int qservSyncTimeoutSec, bool allowQservSync,
bool forceQservSync, unsigned int replicationIntervalSec,
bool purge) {
return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, allowQservSync,
forceQservSync, replicationIntervalSec, purge));
}

bool ReplicationTask::onRun() {
Expand All @@ -49,21 +50,21 @@ bool ReplicationTask::onRun() {
serviceProvider()->config()->get<int>("controller", "catalog-management-priority-level");

launch<FindAllJob>(priority, saveReplicaInfo, allWorkers);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (_allowQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);

launch<FixUpJob>(priority);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (_allowQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);

launch<ReplicateJob>(priority, numReplicas);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (_allowQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);

bool const estimateOnly = false;
launch<RebalanceJob>(priority, estimateOnly);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (_allowQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);

if (_purge) {
launch<PurgeJob>(priority, numReplicas);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (_allowQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);
}

// Keep on getting calls on this method after a wait time
Expand All @@ -72,10 +73,11 @@ bool ReplicationTask::onRun() {

ReplicationTask::ReplicationTask(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int qservSyncTimeoutSec, bool allowQservSync, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge)
: Task(controller, "REPLICATION-THREAD ", onTerminated, replicationIntervalSec),
_qservSyncTimeoutSec(qservSyncTimeoutSec),
_allowQservSync(allowQservSync),
_forceQservSync(forceQservSync),
_purge(purge) {}

Expand Down
6 changes: 4 additions & 2 deletions src/replica/contr/ReplicationTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ReplicationTask : public Task {
* of the task. Set it to 'nullptr' if no call back should be made.
* @param qservSyncTimeoutSec The maximum number of seconds to be waited before giving
* up on the Qserv synchronization requests.
* @param allowQservSync Allow replica synchronization at Qserv workers if 'true'.
* @param forceQservSync Force chunk removal at worker resource collections if 'true'.
* @param replicationIntervalSec The number of seconds to wait in the end of each
* iteration loop before to begin the new one.
Expand All @@ -62,7 +63,7 @@ class ReplicationTask : public Task {
*/
static Ptr create(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int qservSyncTimeoutSec, bool allowQservSync, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge);

protected:
Expand All @@ -72,13 +73,14 @@ class ReplicationTask : public Task {
private:
/// @see ReplicationTask::create()
ReplicationTask(Controller::Ptr const& controller, AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int qservSyncTimeoutSec, bool allowQservSync, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge);

/// The maximum number of seconds to be waited before giving up
/// on the Qserv synchronization requests.
unsigned int const _qservSyncTimeoutSec;

bool const _allowQservSync; ///< Allow replica synchroization at Qserv workers if 'true'.
bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'.
bool const _purge; ///< Purge excess replicas if 'true'.
};
Expand Down

0 comments on commit 80bdb86

Please sign in to comment.