Skip to content

Commit

Permalink
migration: new MigrationModule option and copy completion message
Browse files Browse the repository at this point in the history
**Motivation** allow modified replication behavior to support hotspot
mitigation. Specifically we need replication jobs to complete without
error when the number of replicas is maximized with respect to the
available pools, even if more replicas were requested.

**Modification:**

* In `Job.java`: `Error` -> `Note`.

  The names of all related methods are changed accordingly
  (e.g. `addError()` -> `addNote()`).

* `MigrationModule` gets new option `-target-deficit-mode=(wait|limit)`
  (default `wait`) determining behavior when requested replicas exceeds
  target pool availability:

    * `wait`

        The job will wait for new pools to become available (e.g. added
        to specified pool group). This is the default.

    * `limit`

        If more replicas are requested than exist available pools, the
        job will complete successfully (with a note) when the number of
        replicas is maximized for the pools currently available.

* In interface `TaskCompletionHandler`: new method
  `taskCompletedWithNote(Task, String)` with default implementation
  forwarding to `taskCompleted(Task)`.

* New (manually-generated) files `Task_sm.dot`, and `Task_sm.dot.png`
  showing the state machine model encoded by `Task.sm`.

* New method in `Task.java`: `moreReplicasPossible()` for use in state
  transition guards.

* New finite state machine (FSM) action
  `notifyCompletedWithInsufficientReplicas()` in `Task.java`.

* In `Task.sm`, `Done` entry action `notifyCompleted()` removed in favor
  of per-transition actions `notifyCompleted()`, or
  `notifyCompletedWithInsufficientReplicas()` as appropriate. It was
  determined by analysis that moving the post-state-change entry action
  to a pre-state-change transition did not disrupt the intended
  semantics of the state machine.

* New guarded transitions to `Done` when more replicas are requested
  than are possible with the target pools available currently.

*  `Task_sm.dot`, and `Task_sm.dot.png` updated manually to reflect the
   changes to the state machine.

**Result:**

* Partially implements #7675

* Replication jobs can be issued with `-target-deficit-mode=limit` to
  terminate successfully when all pools available currently have been
  populated with replicas, but more replicas were requested.

**Target:** master
**Request:** -
**Patch:** https://rb.dcache.org/r/14362/
**Closes:**
**Requires-notes:** yes
**Requires-book:**
**Acked-by:** Tigran Mkrtchyan, Dmitry Litvintsev
  • Loading branch information
greenc-FNAL committed Dec 20, 2024
1 parent bfbf240 commit f41cd3e
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ private synchronized void createTask(PoolManagerPoolInformation targetInfo, Stri
false, // compute checksum on update; should not happen
false, // force copy even if pool is not readable
true, // maintain atime
1); // only one copy per task
1, // only one copy per task
true); // wait for new targets if necessary

createTask(taskParameters, source);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ public Task handleMakeOneCopy(FileAttributes attributes) {
false, // compute checksum on update; should not happen
false, // force copy even if pool not readable
true, // maintain atime
1);
1, // only one copy per task
true); // wait for new targets if necessary

Task task = new Task(taskParameters, completionHandler, source, pnfsId,
ReplicaState.CACHED, ONLINE_STICKY_RECORD,
Expand Down
46 changes: 29 additions & 17 deletions modules/dcache/src/main/java/org/dcache/pool/migration/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ enum State {
private final Set<PnfsId> _queued = new LinkedHashSet<>();
private final Map<PnfsId, Long> _sizes = new HashMap<>();
private final Map<PnfsId, Task> _running = new HashMap<>();
private final BlockingQueue<Error> _errors = new ArrayBlockingQueue<>(15);
private final BlockingQueue<Note> _notes = new ArrayBlockingQueue<>(15);
private final Map<PoolMigrationJobCancelMessage, DelayedReply> _cancelRequests =
new HashMap<>();

Expand Down Expand Up @@ -109,7 +109,7 @@ public Job(MigrationContext context, JobDefinition definition) {
context.getExecutor(), definition.selectionStrategy,
definition.poolList, definition.isEager, definition.isMetaOnly,
definition.computeChecksumOnUpdate, definition.forceSourceMode,
definition.maintainAtime, definition.replicas);
definition.maintainAtime, definition.replicas, definition.waitForTargets);

_pinPrefix = context.getPinManagerStub().getDestinationPath().getDestinationAddress()
.getCellName();
Expand Down Expand Up @@ -189,11 +189,11 @@ public void setConcurrency(int concurrency) {
}
}

public void addError(Error error) {
public void addNote(Note note) {
_lock.lock();
try {
while (!_errors.offer(error)) {
_errors.poll();
while (!_notes.offer(note)) {
_notes.poll();
}
} finally {
_lock.unlock();
Expand Down Expand Up @@ -255,10 +255,10 @@ public void getInfo(PrintWriter pw) {
task.getInfo(pw);
}

if (!_errors.isEmpty()) {
pw.println("Most recent errors:");
for (Error error : _errors) {
pw.println(error);
if (!_notes.isEmpty()) {
pw.println("Most recent notes:");
for (Note note : _notes) {
pw.println(note);
}
}
} finally {
Expand Down Expand Up @@ -516,7 +516,7 @@ private void schedule() {

PnfsId pnfsId = i.next();
if (!_context.lock(pnfsId)) {
addError(new Error(0, pnfsId, "File is locked"));
addNote(new Note(0, pnfsId, "File is locked"));
continue;
}

Expand Down Expand Up @@ -742,7 +742,7 @@ public void taskFailed(Task task, int rc, String msg) {
schedule();
}

addError(new Error(task.getId(), pnfsId, msg));
addNote(new Note(task.getId(), pnfsId, msg));
} finally {
_lock.unlock();
}
Expand All @@ -761,7 +761,7 @@ public void taskFailedPermanently(Task task, int rc, String msg) {
_context.unlock(pnfsId);
schedule();

addError(new Error(task.getId(), pnfsId, msg));
addNote(new Note(task.getId(), pnfsId, msg));
} finally {
_lock.unlock();
}
Expand All @@ -785,6 +785,18 @@ public void taskCompleted(Task task) {
}
}

@Override
public void taskCompletedWithNote(Task task, String msg) {
_lock.lock();
try {
taskCompleted(task);

addNote(new Note(task.getId(), task.getPnfsId(), msg));
} finally {
_lock.unlock();
}
}

public Object messageArrived(CellMessage envelope, PoolMigrationJobCancelMessage message) {
DelayedReply reply = new DelayedReply();
_lock.lock();
Expand Down Expand Up @@ -939,23 +951,23 @@ public boolean evaluateLifetimePredicate(Expression expression) {
return expression.evaluateBoolean(symbols);
}

protected static class Error {
protected static class Note {

private final long _id;
private final long _time;
private final PnfsId _pnfsId;
private final String _error;
private final String _note;

public Error(long id, PnfsId pnfsId, String error) {
public Note(long id, PnfsId pnfsId, String note) {
_id = id;
_time = System.currentTimeMillis();
_pnfsId = pnfsId;
_error = error;
_note = note;
}

public String toString() {
return String.format("%tT [%d] %s: %s",
_time, _id, _pnfsId, _error);
_time, _id, _pnfsId, _note);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public class JobDefinition {
*/
public final int replicas;

/**
* Whether to wait for targets to become available to satisfy number of replicas.
*/
public final boolean waitForTargets;


public JobDefinition(Predicate<CacheEntry> filter,
CacheEntryMode sourceMode,
CacheEntryMode targetMode,
Expand All @@ -120,7 +126,8 @@ public JobDefinition(Predicate<CacheEntry> filter,
boolean maintainAtime,
Expression pauseWhen,
Expression stopWhen,
boolean forceSourceMode) {
boolean forceSourceMode,
boolean waitForTargets) {
this.filter = filter;
this.sourceMode = sourceMode;
this.targetMode = targetMode;
Expand All @@ -139,5 +146,6 @@ public JobDefinition(Predicate<CacheEntry> filter,
this.pauseWhen = pauseWhen;
this.stopWhen = stopWhen;
this.forceSourceMode = forceSourceMode;
this.waitForTargets = waitForTargets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,16 @@ public class MigrationCopyCommand implements Callable<String> {
usage = "Enables the transfer of files from a disabled pool.")
boolean forceSourceMode;

@Option(name = "target-deficit-mode", metaVar = "tdmode",
values = { "wait", "limit" },
category = "Transfer options",
usage = "Behaviour when requested replicas exceeds available targets:\n" +
"wait:\n" +
" wait for new targets to become available.\n" +
"limit:\n" +
" limit the number of replicas to that of the currently-available targets.\n")
String targetDeficitMode = "wait";

@Argument(metaVar = "target",
required = false,
usage = "Required unless -target=pgroup is supplied, in which case we" +
Expand Down Expand Up @@ -838,7 +848,8 @@ public String call() throws IllegalArgumentException {
maintainAtime,
createLifetimePredicate(pauseWhen),
createLifetimePredicate(stopWhen),
forceSourceMode);
forceSourceMode,
targetDeficitMode.equals("wait"));

if (definition.targetMode.state == CacheEntryMode.State.DELETE
|| definition.targetMode.state == CacheEntryMode.State.REMOVABLE) {
Expand Down
14 changes: 14 additions & 0 deletions modules/dcache/src/main/java/org/dcache/pool/migration/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ synchronized boolean needsMoreReplicas() {
return _replicas.size() < _parameters.replicas;
}

synchronized boolean moreReplicasPossible() {
return _parameters.waitForTargets || (_replicas.size() < _parameters.poolList.getPools().size());
}

/**
* FSM Action
*/
Expand Down Expand Up @@ -347,6 +351,16 @@ void notifyCompleted() {
new FireAndForgetTask(() -> _callbackHandler.taskCompleted(Task.this)));
}

/**
* FSM Action
*/
void notifyCompletedWithInsufficientReplicas() {
_parameters.executor.execute(
new FireAndForgetTask(() -> _callbackHandler.taskCompletedWithNote(Task.this,
String.format("File replicas truncated at %s due to pool availability (%s requested)",
_replicas.size(), _parameters.replicas))));
}

/**
* FSM Action
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ public interface TaskCompletionHandler {
* The task completed without error.
*/
void taskCompleted(Task task);
default void taskCompletedWithNote(Task task, String msg) { taskCompleted(task); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,16 @@ public class TaskParameters {
*/
public final int replicas;

/**
* Whether to wait for targets to become available to satisfy number of replicas.
*/
public final boolean waitForTargets;

public TaskParameters(CellStub pool, CellStub pnfs, CellStub pinManager,
ScheduledExecutorService executor,
PoolSelectionStrategy selectionStrategy, RefreshablePoolList poolList, boolean isEager,
boolean isMetaOnly, boolean computeChecksumOnUpdate, boolean forceSourceMode,
boolean maintainAtime, int replicas) {
boolean maintainAtime, int replicas, boolean waitForTargets) {
this.pool = pool;
this.pnfs = pnfs;
this.pinManager = pinManager;
Expand All @@ -105,5 +110,6 @@ public TaskParameters(CellStub pool, CellStub pnfs, CellStub pinManager,
this.forceSourceMode = forceSourceMode;
this.maintainAtime = maintainAtime;
this.replicas = replicas;
this.waitForTargets = waitForTargets;
}
}
33 changes: 32 additions & 1 deletion modules/dcache/src/main/smc/org/dcache/pool/migration/Task.sm
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ Entry
messageArrived(message: PoolMigrationCopyFinishedMessage)
Done
{
notifyCompleted();
}

}
Expand Down Expand Up @@ -266,6 +267,7 @@ Entry
messageArrived(message: PoolMigrationCopyFinishedMessage)
Done
{
notifyCompleted();
}
}

Expand Down Expand Up @@ -359,6 +361,12 @@ Exit
UpdatingExistingFile
{
}
messageArrived(message: PoolMigrationCopyFinishedMessage)
[ ctxt.needsMoreReplicas() && ! (ctxt.moreReplicasPossible() || ctxt.isMetaOnly()) ]
Done
{
notifyCompletedWithInsufficientReplicas();
}
messageArrived(message: PoolMigrationCopyFinishedMessage)
[ ctxt.needsMoreReplicas() && !ctxt.isMetaOnly() ]
InitiatingCopy
Expand All @@ -378,6 +386,7 @@ Exit
messageArrived(message: PoolMigrationCopyFinishedMessage)
Done
{
notifyCompleted();
}
cancel
Cancelling
Expand Down Expand Up @@ -435,6 +444,12 @@ Entry
UpdatingExistingFile
{
}
messageArrived(message: PoolMigrationCopyFinishedMessage)
[ ctxt.needsMoreReplicas() && ! (ctxt.moreReplicasPossible() || ctxt.isMetaOnly()) ]
Done
{
notifyCompletedWithInsufficientReplicas();
}
messageArrived(message: PoolMigrationCopyFinishedMessage)
[ ctxt.needsMoreReplicas() && !ctxt.isMetaOnly() ]
InitiatingCopy
Expand All @@ -454,6 +469,7 @@ Entry
messageArrived(message: PoolMigrationCopyFinishedMessage)
Done
{
notifyCompleted();
}
cancel
Cancelling
Expand Down Expand Up @@ -527,6 +543,12 @@ Exit
UpdatingExistingFile
{
}
messageArrived(message: PoolMigrationCopyFinishedMessage)
[ ctxt.needsMoreReplicas() && ! (ctxt.moreReplicasPossible() || ctxt.isMetaOnly()) ]
Done
{
notifyCompletedWithInsufficientReplicas();
}
messageArrived(message: PoolMigrationCopyFinishedMessage)
[ ctxt.needsMoreReplicas() && !ctxt.isMetaOnly() ]
InitiatingCopy
Expand All @@ -546,6 +568,7 @@ Exit
messageArrived(message: PoolMigrationCopyFinishedMessage)
Done
{
notifyCompleted();
}
cancel
Cancelling
Expand Down Expand Up @@ -595,6 +618,12 @@ Exit
UpdatingExistingFile
{
}
messageArrived(message: PoolMigrationCopyFinishedMessage)
[ ctxt.needsMoreReplicas() && ! (ctxt.moreReplicasPossible() || ctxt.isMetaOnly()) ]
Done
{
notifyCompletedWithInsufficientReplicas();
}
messageArrived(message: PoolMigrationCopyFinishedMessage)
[ ctxt.needsMoreReplicas() && !ctxt.isMetaOnly() ]
InitiatingCopy
Expand All @@ -614,6 +643,7 @@ Exit
messageArrived(message: PoolMigrationCopyFinishedMessage)
Done
{
notifyCompleted();
}
}

Expand All @@ -626,6 +656,7 @@ Entry
move_success
Done
{
notifyCompleted();
}
move_failure(rc: Integer, cause: Object)
Failed
Expand Down Expand Up @@ -689,6 +720,7 @@ Exit
[ message.getReturnCode() == 0 ]
Done
{
notifyCompleted();
}
cancel_success
nil
Expand Down Expand Up @@ -731,7 +763,6 @@ Failed
Done
Entry
{
notifyCompleted();
}
{
}
Expand Down
Loading

0 comments on commit f41cd3e

Please sign in to comment.