Skip to content

Commit

Permalink
more improvements for async. handling of download transfers #11057
Browse files Browse the repository at this point in the history
  • Loading branch information
landreev committed Dec 16, 2024
1 parent dab3884 commit 8bf4156
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 60 deletions.
145 changes: 93 additions & 52 deletions src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private String getRuleId(GlobusEndpoint endpoint, String principal, String permi
* @param dataset - the dataset associated with the rule
* @param globusLogger - a separate logger instance, may be null
*/
public void deletePermission(String ruleId, Dataset dataset, Logger globusLogger) {
private void deletePermission(String ruleId, Dataset dataset, Logger globusLogger) {
globusLogger.fine("Start deleting rule " + ruleId + " for dataset " + dataset.getId());
if (ruleId.length() > 0) {
if (dataset != null) {
Expand All @@ -172,7 +172,7 @@ public void deletePermission(String ruleId, Dataset dataset, Logger globusLogger
globusLogger.info("Access rule " + ruleId + " was deleted successfully");
}
} catch (MalformedURLException ex) {
logger.log(Level.WARNING,
globusLogger.log(Level.WARNING,
"Failed to delete access rule " + ruleId + " on endpoint " + endpoint.getId(), ex);
}
}
Expand Down Expand Up @@ -444,7 +444,6 @@ private void monitorTemporaryPermissions(String ruleId, long datasetId) {
* files are created in general, some calls may use the
* class logger)
* @return
* @throws MalformedURLException
*/
public GlobusTaskState getTask(String accessToken, String taskId, Logger globusLogger) {

Expand Down Expand Up @@ -730,8 +729,13 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques

String logTimestamp = logFormatter.format(startDate);
Logger globusLogger = Logger.getLogger(
"edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusUpload" + logTimestamp);
String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusUpload_" + dataset.getId() + "_" + logTimestamp
"edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus"
+ GlobusTaskInProgress.TaskType.UPLOAD + logTimestamp);

String logFileName = System.getProperty("com.sun.aas.instanceRoot")
+ File.separator + "logs"
+ File.separator + "globus" + GlobusTaskInProgress.TaskType.UPLOAD + "_"
+ logTimestamp + " " + dataset.getId() + "_"
+ ".log";
FileHandler fileHandler;

Expand Down Expand Up @@ -1160,12 +1164,18 @@ private void processUploadedFiles(JsonArray filesJsonArray, Dataset dataset, Aut
public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser) throws MalformedURLException {

Date startDate = new Date();


// @todo the logger initialization method will be moved into the GlobusUtil
// eventually, for both this and the monitoring service to use
String logTimestamp = logFormatter.format(startDate);
Logger globusLogger = Logger.getLogger(
"edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusDownload" + logTimestamp);
"edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus"
+ GlobusTaskInProgress.TaskType.DOWNLOAD + logTimestamp);

String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusDownload_id_" + dataset.getId() + "_" + logTimestamp
String logFileName = System.getProperty("com.sun.aas.instanceRoot")
+ File.separator + "logs"
+ File.separator + "globus" + GlobusTaskInProgress.TaskType.DOWNLOAD + "_"
+ dataset.getId() + "_" + logTimestamp
+ ".log";
FileHandler fileHandler;
boolean fileHandlerSuceeded;
Expand Down Expand Up @@ -1194,8 +1204,8 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser
// If the rules_cache times out, the permission will be deleted. Presumably that
// doesn't affect a
// globus task status check
GlobusTaskState task = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger);
String ruleId = getRuleId(endpoint, task.getOwner_id(), "r");
GlobusTaskState taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger);
String ruleId = getRuleId(endpoint, taskState.getOwner_id(), "r");
if (ruleId != null) {
logger.fine("Found rule: " + ruleId);
Long datasetId = rulesCache.getIfPresent(ruleId);
Expand Down Expand Up @@ -1227,51 +1237,25 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser
GlobusTaskInProgress.TaskType.DOWNLOAD,
dataset,
endpoint.getClientToken(),
authUser instanceof AuthenticatedUser ? authUser : null,
authUser instanceof AuthenticatedUser ? (AuthenticatedUser)authUser : null,
ruleId,
new Timestamp(startDate.getTime()));
em.persist(taskInProgress);

if (fileHandler != null) {
fileHandler.close();
}
fileHandler.close();

// return and forget
return;
}

task = globusStatusCheck(endpoint, taskIdentifier, globusLogger);
// @todo null check?
String taskStatus = GlobusUtil.getTaskStatus(task);

// Transfer is done (success or failure) so delete the rule
if (ruleId != null) {
logger.fine("Deleting: rule: " + ruleId);
deletePermission(ruleId, dataset, globusLogger);
}

if (taskStatus.startsWith("FAILED") || taskStatus.startsWith("INACTIVE")) {
String comment = "Reason : " + taskStatus.split("#")[1] + "<br> Short Description : "
+ taskStatus.split("#")[2];
if (authUser != null && authUser instanceof AuthenticatedUser) {
userNotificationService.sendNotification((AuthenticatedUser) authUser, new Timestamp(new Date().getTime()),
UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), comment, true);
}

globusLogger.info("Globus task failed during download process: "+comment);
} else if (authUser != null && authUser instanceof AuthenticatedUser) {

boolean taskSkippedFiles = (task.getSkip_source_errors() == null) ? false : task.getSkip_source_errors();
if (!taskSkippedFiles) {
userNotificationService.sendNotification((AuthenticatedUser) authUser,
new Timestamp(new Date().getTime()), UserNotification.Type.GLOBUSDOWNLOADCOMPLETED,
dataset.getId());
} else {
userNotificationService.sendNotification((AuthenticatedUser) authUser,
new Timestamp(new Date().getTime()), UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS,
dataset.getId(), "");
}
}
// Check again:
taskState = globusStatusCheck(endpoint, taskIdentifier, globusLogger);

processCompletedDownloadTask(taskState,
authUser instanceof AuthenticatedUser ? (AuthenticatedUser)authUser : null,
dataset,
ruleId,
globusLogger);
}

Executor executor = Executors.newFixedThreadPool(10);
Expand Down Expand Up @@ -1540,6 +1524,10 @@ public List<GlobusTaskInProgress> findAllOngoingTasks() {
return em.createQuery("select object(o) from GlobusTaskInProgress as o order by o.startTime", GlobusTaskInProgress.class).getResultList();
}

public List<GlobusTaskInProgress> findAllOngoingTasks(GlobusTaskInProgress.TaskType taskType) {
return em.createQuery("select object(o) from GlobusTaskInProgress as o where o.taskType=:taskType order by o.startTime", GlobusTaskInProgress.class).setParameter("taskType", taskType).getResultList();
}

public void deleteTask(GlobusTaskInProgress task) {
GlobusTaskInProgress mergedTask = em.merge(task);
em.remove(mergedTask);
Expand All @@ -1549,14 +1537,10 @@ public List<ExternalFileUploadInProgress> findExternalUploadsByTaskId(String tas
return em.createNamedQuery("ExternalFileUploadInProgress.findByTaskId").setParameter("taskId", taskId).getResultList();
}

public void processCompletedTask(GlobusTaskInProgress globusTask, boolean taskSuccess, String taskStatus, Logger taskLogger) {
public void processCompletedTask(GlobusTaskInProgress globusTask, GlobusTaskState taskState, boolean taskSuccess, String taskStatus, Logger taskLogger) {
String ruleId = globusTask.getRuleId();
Dataset dataset = globusTask.getDataset();
AuthenticatedUser authUser = globusTask.getLocalUser();
if (authUser == null) {
// @todo log error message; do nothing
return;
}

if (GlobusTaskInProgress.TaskType.UPLOAD.equals(globusTask.getTaskType())) {
List<ExternalFileUploadInProgress> fileUploadsInProgress = findExternalUploadsByTaskId(globusTask.getTaskId());
Expand All @@ -1578,10 +1562,67 @@ public void processCompletedTask(GlobusTaskInProgress globusTask, boolean taskSu
JsonArray filesJsonArray = filesJsonArrayBuilder.build();

processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus);
} else if (GlobusTaskInProgress.TaskType.DOWNLOAD.equals(globusTask.getTaskType())) {

processCompletedDownloadTask(taskState, authUser, dataset, ruleId, taskLogger);

} else {
// @todo eventually, extend this async. framework to handle Glonus downloads as well
logger.warning("Unknown or null TaskType passed to processCompletedTask()");
}

}

private void processCompletedDownloadTask(GlobusTaskState taskState,
AuthenticatedUser authUser,
Dataset dataset,
String ruleId,
Logger taskLogger) {
// The only thing to do on completion of a remote download
// transfer is to delete the permission ACL that Dataverse
// had negotiated for the user before the task was initialized:

if (ruleId != null) {
deletePermission(ruleId, dataset, taskLogger);
}

String taskStatus = GlobusUtil.getTaskStatus(taskState);

// ... plus log the outcome and send any notifications:
if (taskStatus.startsWith("FAILED") || taskStatus.startsWith("INACTIVE")) {
// Outright, unambiguous failure:
String comment = "Reason : " + taskStatus.split("#")[1] + "<br> Short Description : "
+ taskStatus.split("#")[2];
taskLogger.info("Globus task failed during download process: " + comment);

sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), comment);

} else {
// Success, total or partial
boolean taskSkippedFiles = (taskState == null || taskState.getSkip_source_errors() == null) ? false : taskState.getSkip_source_errors();

if (!taskSkippedFiles) {
taskLogger.info("Globus task completed successfully");

sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETED, dataset.getId(), "");
} else {
taskLogger.info("Globus task completed with partial success (skip source errors)");

sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), "");
}
}
}

private void sendNotification(AuthenticatedUser authUser,
UserNotification.Type type,
Long datasetId,
String comment) {
if (authUser != null) {
userNotificationService.sendNotification(authUser,
new Timestamp(new Date().getTime()),
type,
datasetId,
comment);
}
}

public void deleteExternalUploadRecords(String taskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* This class is used to store the state of an ongoing Globus task (transfer)
* as reported by the Globus task API.
*/

public class GlobusTaskState {

private String DATA_TYPE;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public static boolean isTaskCompleted(GlobusTaskState task) {
// TODO: "nice_status": "CONNECTION_FAILED" *may* mean
// that it's a Globus issue on the endnode side, that is
// in fact recoverable; should we add it to the list here?
// @todo: I'm tempted to just take "ACTIVE" for face value,
// and assume that it's still ongoing.
if (task.getNice_status().equalsIgnoreCase("ok")
|| task.getNice_status().equalsIgnoreCase("queued")) {
return false;
Expand All @@ -61,6 +63,9 @@ public static boolean isTaskSucceeded(GlobusTaskState task) {
// has not completed *successfully*.
return false;
}
// @todo: should we be more careful here, and actually check for
// status.equalsI("SUCCEEDED") etc. before assuming the task
// did in fact succeed?
return true;
}
}
Expand Down Expand Up @@ -89,6 +94,7 @@ public static String getTaskStatus(GlobusTaskState task) {
status = "FAILED";
}
} else {
// @todo are we sure?
status = "FAILED";
}
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,30 @@ public void init() {
logger.info("Starting Globus task monitoring service");
int pollingInterval = SystemConfig.getIntLimitFromStringOrDefault(
settingsSvc.getValueForKey(SettingsServiceBean.Key.GlobusPollingInterval), 600);
this.scheduler.scheduleWithFixedDelay(this::checkOngoingTasks,
this.scheduler.scheduleWithFixedDelay(this::checkOngoingUploadTasks,
0, pollingInterval,
TimeUnit.SECONDS);

// A separate monitoring service for ongoing download tasks:
this.scheduler.scheduleWithFixedDelay(this::checkOngoingDownloadTasks,
0, 13 /*pollingInterval*/,
TimeUnit.SECONDS);

} else {
logger.info("Skipping Globus task monitor initialization");
}


}

/**
* This method will be executed on a timer-like schedule, continuously
* monitoring all the ongoing external Globus tasks (transfers).
* monitoring all the ongoing external Globus tasks (transfers TO remote
* Globus endnodes).
*/
public void checkOngoingTasks() {
logger.fine("Performing a scheduled external Globus task check");
List<GlobusTaskInProgress> tasks = globusService.findAllOngoingTasks();
public void checkOngoingUploadTasks() {
logger.fine("Performing a scheduled external Globus UPLOAD task check");
List<GlobusTaskInProgress> tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.UPLOAD);

tasks.forEach(t -> {
FileHandler taskLogHandler = getTaskLogHandler(t);
Expand All @@ -76,7 +85,7 @@ public void checkOngoingTasks() {
GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), taskLogger);
if (GlobusUtil.isTaskCompleted(retrieved)) {
// Do our thing, finalize adding the files to the dataset
globusService.processCompletedTask(t, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger);
globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger);
// Whether it finished successfully, or failed in the process,
// there's no need to keep monitoring this task, so we can
// delete it.
Expand All @@ -92,6 +101,36 @@ public void checkOngoingTasks() {
});
}

/**
* This method will be executed on a timer-like schedule, continuously
* monitoring all the ongoing external Globus download tasks (transfers by
* Dataverse users FROM remote, Dataverse-managed Globus endnodes).
*/
public void checkOngoingDownloadTasks() {
logger.fine("Performing a scheduled external Globus DOWNLOAD task check");
List<GlobusTaskInProgress> tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.DOWNLOAD);

tasks.forEach(t -> {
FileHandler taskLogHandler = getTaskLogHandler(t);
Logger taskLogger = getTaskLogger(t, taskLogHandler);

GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), taskLogger);
if (GlobusUtil.isTaskCompleted(retrieved)) {
globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger);
// globusService.processCompletedTask(t, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger);
// Whether it finished successfully or failed, the task can now
// be deleted.
globusService.deleteTask(t);
}

if (taskLogHandler != null) {
// @todo it should be prudent to cache these loggers and handlers
// between monitoring runs (should be fairly easy to do)
taskLogHandler.close();
}
});
}

private FileHandler getTaskLogHandler(GlobusTaskInProgress task) {
if (task == null) {
return null;
Expand All @@ -100,7 +139,10 @@ private FileHandler getTaskLogHandler(GlobusTaskInProgress task) {
Date startDate = new Date(task.getStartTime().getTime());
String logTimeStamp = logFormatter.format(startDate);

String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusUpload_" + task.getDataset().getId() + "_" + logTimeStamp
String logFileName = System.getProperty("com.sun.aas.instanceRoot")
+ File.separator + "logs"
+ File.separator + "globus" + task.getTaskType() + "_"
+ logTimeStamp + "_" + task.getDataset().getId()
+ ".log";
FileHandler fileHandler;
try {
Expand All @@ -120,7 +162,8 @@ private Logger getTaskLogger(GlobusTaskInProgress task, FileHandler logFileHandl
String logTimeStamp = logFormatter.format(startDate);

Logger taskLogger = Logger.getLogger(
"edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusUpload" + logTimeStamp);
"edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus"
+ task.getTaskType() + logTimeStamp);
taskLogger.setUseParentHandlers(false);

taskLogger.addHandler(logFileHandler);
Expand Down

0 comments on commit 8bf4156

Please sign in to comment.