From f9e90e9a0a78f89f3ee27b365e2c21ce452336cc Mon Sep 17 00:00:00 2001 From: Leonid Andreev Date: Tue, 17 Dec 2024 16:34:15 -0500 Subject: [PATCH] more work in progress #11057 --- .../harvard/iq/dataverse/api/Datasets.java | 2 +- .../dataverse/globus/GlobusServiceBean.java | 259 +++++++++++++----- .../globus/GlobusTaskInProgress.java | 3 +- .../iq/dataverse/globus/GlobusTaskState.java | 2 + .../iq/dataverse/globus/GlobusUtil.java | 14 +- .../globus/TaskMonitoringServiceBean.java | 4 +- 6 files changed, 209 insertions(+), 75 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/api/Datasets.java b/src/main/java/edu/harvard/iq/dataverse/api/Datasets.java index 02c0a610726..42ed6602072 100644 --- a/src/main/java/edu/harvard/iq/dataverse/api/Datasets.java +++ b/src/main/java/edu/harvard/iq/dataverse/api/Datasets.java @@ -4389,7 +4389,7 @@ public Response requestGlobusDownload(@Context ContainerRequestContext crc, @Pat case 400: return badRequest("Unable to grant permission"); case 409: - return conflict("Permission already exists"); + return conflict("Permission already exists or no more permissions allowed"); default: return error(null, "Unexpected error when granting permission"); } diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java index 5f85aa0cd80..59dad72b1d9 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java @@ -119,11 +119,20 @@ public class GlobusServiceBean implements java.io.Serializable { private static final SimpleDateFormat logFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm-ss"); private String getRuleId(GlobusEndpoint endpoint, String principal, String permissions) - throws MalformedURLException { + /*throws MalformedURLException*/ { + // @todo the method should probably swallow this MalformedURLException, rather + // than throw it? String principalType = "identity"; - - URL url = new URL("https://transfer.api.globusonline.org/v0.10/endpoint/" + endpoint.getId() + "/access_list"); + String apiUrlString = "https://transfer.api.globusonline.org/v0.10/endpoint/" + endpoint.getId() + "/access_list"; + URL url = null; + + try { + url = new URL(apiUrlString); + } catch (MalformedURLException mue) { + logger.severe("Malformed URL exception when attempting to look up ACL rule via Globus API: " + apiUrlString); + return null; + } MakeRequestResponse result = makeRequest(url, "Bearer", endpoint.getClientToken(), "GET", null); if (result.status == 200) { AccessList al = parseJson(result.jsonResponse, AccessList.class, false); @@ -160,21 +169,39 @@ private void deletePermission(String ruleId, Dataset dataset, Logger globusLogge if (dataset != null) { GlobusEndpoint endpoint = getGlobusEndpoint(dataset); if (endpoint != null) { - String accessToken = endpoint.getClientToken(); - globusLogger.info("Start deleting permissions."); - try { - URL url = new URL("https://transfer.api.globusonline.org/v0.10/endpoint/" + endpoint.getId() - + "/access/" + ruleId); - MakeRequestResponse result = makeRequest(url, "Bearer", accessToken, "DELETE", null); - if (result.status != 200) { - globusLogger.warning("Cannot delete access rule " + ruleId); - } else { - globusLogger.info("Access rule " + ruleId + " was deleted successfully"); - } - } catch (MalformedURLException ex) { - globusLogger.log(Level.WARNING, - "Failed to delete access rule " + ruleId + " on endpoint " + endpoint.getId(), ex); + deletePermission(ruleId, endpoint, globusLogger); + } + } + } + } + + /** + * Call to delete a globus rule, via the ruleId and supplied endpoint + * + * @param ruleId - Globus rule id - assumed to be associated with the + * dataset's file path (should not be called with a user + * specified rule id w/o further checking) + * @param endpoint - the Globus endpoint associated with the rule + * @param globusLogger - a separate logger instance, may be null + */ + private void deletePermission(String ruleId, GlobusEndpoint endpoint, Logger globusLogger) { + globusLogger.fine("Start deleting rule " + ruleId + " for endpoint " + endpoint.getBasePath()); + if (ruleId.length() > 0) { + if (endpoint != null) { + String accessToken = endpoint.getClientToken(); + globusLogger.info("Start deleting permissions."); + try { + URL url = new URL("https://transfer.api.globusonline.org/v0.10/endpoint/" + endpoint.getId() + + "/access/" + ruleId); + MakeRequestResponse result = makeRequest(url, "Bearer", accessToken, "DELETE", null); + if (result.status != 200) { + globusLogger.warning("Cannot delete access rule " + ruleId); + } else { + globusLogger.info("Access rule " + ruleId + " was deleted successfully"); } + } catch (MalformedURLException ex) { + globusLogger.log(Level.WARNING, + "Failed to delete access rule " + ruleId + " on endpoint " + endpoint.getId(), ex); } } } @@ -209,6 +236,20 @@ public JsonObject requestAccessiblePaths(String principal, Dataset dataset, int } //The dir for the dataset's data exists, so try to request permission for the principal int requestPermStatus = requestPermission(endpoint, dataset, permissions); + + if (requestPermStatus == 409) { + // This is a special case - a 409 *may* mean that the rule already + // exists for this endnote and for this user (if, for example, + // Dataverse failed to remove it after the last upload has completed. + // That should be ok with us (but let's confirm that is indeed the + // case; alternatively it may mean that permissions cannot be issued + // for some other reason): + String ruleId = getRuleId(endpoint, principal, "rw"); + if (ruleId != null) { + requestPermStatus = 201; + } + } + response.add("status", requestPermStatus); if (requestPermStatus == 201) { String driverId = dataset.getEffectiveStorageDriverId(); @@ -461,21 +502,26 @@ public GlobusTaskState getTask(String accessToken, String taskId, Logger globusL MakeRequestResponse result = makeRequest(url, "Bearer", accessToken, "GET", null); - GlobusTaskState task = null; + GlobusTaskState taskState = null; if (result.status == 200) { - task = parseJson(result.jsonResponse, GlobusTaskState.class, false); + taskState = parseJson(result.jsonResponse, GlobusTaskState.class, false); } + // @todo some provision for a 403/permission denied here, due to an + // expired token maybe? + if (result.status != 200) { // @todo It should probably retry it 2-3 times before giving up; // similarly, it should probably differentiate between a "no such task" // response and something intermittent like a server/network error or // an expired token... i.e. something that's recoverable (?) + // edit: yes, but, should be done outside of this method, in the code + // that uses it myLogger.warning("Cannot find information for the task " + taskId + " : Reason : " + result.jsonResponse.toString()); } - return task; + return taskState; } /** @@ -638,7 +684,23 @@ public int setPermissionForDownload(Dataset dataset, String principal) { permissions.setPath(endpoint.getBasePath() + "/"); permissions.setPermissions("r"); - return requestPermission(endpoint, dataset, permissions); + // @todo: check specifically for a 409 here, which *may* indicate + // that the permission already exists on the collection - in which case + // we should confirm and proceed with the download, rather than give up. + int status = requestPermission(endpoint, dataset, permissions); + + if (status == 409) { + // It's possible that the permission already exists (if, for example, + // Dataverse failed to delete it after the last download by this + // user, for whatever reason. We'll confirm that, and if that's the + // case, we'll just assume that it's ok to proceed with the download + // (rather than give up, as this code used to) + String ruleId = getRuleId(endpoint, principal, "r"); + if (ruleId != null) { + return 201; + } + } + return status; } // Generates the URL to launch the Globus app for upload @@ -760,15 +822,29 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques String taskIdentifier = jsonData.getString("taskIdentifier"); GlobusEndpoint endpoint = getGlobusEndpoint(dataset); + + // The first check on the status of the task: + // It is important to be careful here, and not give up on the task + // prematurely if anything goes wrong during this initial api call! + // So, perhaps a @todo - make a number of attempts until we get a + // valid response ? GlobusTaskState taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); - String ruleId = getRuleId(endpoint, taskState.getOwner_id(), "rw"); - logger.fine("Found rule: " + ruleId); + String ruleId = taskState != null + ? getRuleId(endpoint, taskState.getOwner_id(), "rw") + : null; + if (ruleId != null) { + logger.fine("Found rule: " + ruleId); Long datasetId = rulesCache.getIfPresent(ruleId); if (datasetId != null) { - // Will not delete rule + // This will only "invalidate" the local cache entry, will not + // delete or invalidate the actual Globus rule rulesCache.invalidate(ruleId); } + } else { + // @todo warning, etc. + // we'll proceed anyway, under the assumption that we will make + // another attempt to look it up later } // Wait before first check @@ -810,7 +886,7 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques // finish one way or another!) taskState = globusStatusCheck(endpoint, taskIdentifier, globusLogger); // @todo null check, or make sure it's never null - String taskStatus = GlobusUtil.getTaskStatus(taskState); + String taskStatus = GlobusUtil.getCompletedTaskStatus(taskState); boolean taskSuccess = GlobusUtil.isTaskCompleted(taskState); @@ -1204,30 +1280,57 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser // If the rules_cache times out, the permission will be deleted. Presumably that // doesn't affect a // globus task status check - GlobusTaskState taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); - String ruleId = getRuleId(endpoint, taskState.getOwner_id(), "r"); + + // Wait before first check: + try { + Thread.sleep(3000); + } catch (InterruptedException ie) { + logger.warning("caught an Interrupted Exception while trying to sleep for 3 sec. in globusDownload()"); + } + + // The first check on the status of the task: + // It is important to be careful here, and not give up on the task + // prematurely if anything goes wrong during this initial api call! + + GlobusTaskState taskState = null; + + int retriesLimit = 3; + int retries = 0; + + while (taskState == null && retries < retriesLimit) { + taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); + retries++; + try { + Thread.sleep(3000); + } catch (InterruptedException ie) { + logger.warning("caught an Interrupted Exception while trying to sleep for 3 sec. in globusDownload()"); + } + } + + String ruleId = taskState != null + ? getRuleId(endpoint, taskState.getOwner_id(), "r") + : null; + if (ruleId != null) { logger.fine("Found rule: " + ruleId); Long datasetId = rulesCache.getIfPresent(ruleId); if (datasetId != null) { - logger.fine("Deleting from cache: rule: " + ruleId); - // Will not delete rule + logger.fine("Deleting from local cache: rule: " + ruleId); + // This will only "invalidate" the local cache entry, will not + // delete or invalidate the actual Globus rule rulesCache.invalidate(ruleId); } } else { // Something is wrong - the rule should be there (a race with the cache timing // out?) - logger.warning("ruleId not found for taskId: " + taskIdentifier); - // @todo: do we need to bail out then, or ...? + logger.warning("ruleId not found for download taskId: " + taskIdentifier); + // We will proceed monitoring the transfer, even though the ruleId + // is null at the moment. The whole point of monitoring a download + // task is to remove the rule on the collection side once it's done, + // and we will need the rule id for that. But let's hope this was a + // temporary condition and we will eventually be able to look it up. } - - // Wait before first check - try { - Thread.sleep(3000); - } catch (InterruptedException ie) { - logger.warning("caught an Interrupted Exception while trying to sleep for 3 sec. in globusDownload()"); - } - + if (FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) { // Save the task information in the database so that the Globus monitoring @@ -1244,11 +1347,14 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser fileHandler.close(); - // return and forget + // return and forget; the Monitoring Service will pick it up on + // the next scheduled check return; } - // Check again: + // Old implementation: + // globusStatusCheck will loop continuously, until it determines that the + // task has completed - i.e., for the duration of the task taskState = globusStatusCheck(endpoint, taskIdentifier, globusLogger); processCompletedDownloadTask(taskState, @@ -1541,33 +1647,38 @@ public void processCompletedTask(GlobusTaskInProgress globusTask, GlobusTaskStat String ruleId = globusTask.getRuleId(); Dataset dataset = globusTask.getDataset(); AuthenticatedUser authUser = globusTask.getLocalUser(); + + switch (globusTask.getTaskType()) { - if (GlobusTaskInProgress.TaskType.UPLOAD.equals(globusTask.getTaskType())) { - List fileUploadsInProgress = findExternalUploadsByTaskId(globusTask.getTaskId()); + case UPLOAD: + List fileUploadsInProgress = findExternalUploadsByTaskId(globusTask.getTaskId()); - if (fileUploadsInProgress == null || fileUploadsInProgress.size() < 1) { - // @todo log error message; do nothing - // (will this ever happen though?) - return; - } + if (fileUploadsInProgress == null || fileUploadsInProgress.size() < 1) { + // @todo log error message; do nothing + // (will this ever happen though?) + return; + } - JsonArrayBuilder filesJsonArrayBuilder = Json.createArrayBuilder(); + JsonArrayBuilder filesJsonArrayBuilder = Json.createArrayBuilder(); - for (ExternalFileUploadInProgress pendingFile : fileUploadsInProgress) { - String jsonInfoString = pendingFile.getFileInfo(); - JsonObject fileObject = JsonUtil.getJsonObject(jsonInfoString); - filesJsonArrayBuilder.add(fileObject); - } + for (ExternalFileUploadInProgress pendingFile : fileUploadsInProgress) { + String jsonInfoString = pendingFile.getFileInfo(); + JsonObject fileObject = JsonUtil.getJsonObject(jsonInfoString); + filesJsonArrayBuilder.add(fileObject); + } - JsonArray filesJsonArray = filesJsonArrayBuilder.build(); + JsonArray filesJsonArray = filesJsonArrayBuilder.build(); - processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus); - } else if (GlobusTaskInProgress.TaskType.DOWNLOAD.equals(globusTask.getTaskType())) { - - processCompletedDownloadTask(taskState, authUser, dataset, ruleId, taskLogger); - - } else { - logger.warning("Unknown or null TaskType passed to processCompletedTask()"); + processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus); + break; + + case DOWNLOAD: + + processCompletedDownloadTask(taskState, authUser, dataset, ruleId, taskLogger); + break; + + default: + logger.warning("Unknown or null TaskType passed to processCompletedTask()"); } } @@ -1579,13 +1690,29 @@ private void processCompletedDownloadTask(GlobusTaskState taskState, Logger taskLogger) { // The only thing to do on completion of a remote download // transfer is to delete the permission ACL that Dataverse - // had negotiated for the user before the task was initialized: + // had negotiated for the user before the task was initialized ... - if (ruleId != null) { - deletePermission(ruleId, dataset, taskLogger); + GlobusEndpoint endpoint = getGlobusEndpoint(dataset); + + if (endpoint != null) { + if (ruleId == null) { + // It is possible that, for whatever reason, we failed to look up + // the rule id when the monitoring of the task was initiated - but + // now that it has completed, let's try and look it up again: + //try { + getRuleId(endpoint, taskState.getOwner_id(), "r"); + //} catch (MalformedURLException mue) { + // taskLogger.warning("Malformed URL Exception when looking up the rule for download task " + taskState.getTask_id()); + // //@todo: the exception should probably be swallowed inside the method + //} + } + + if (ruleId != null) { + deletePermission(ruleId, endpoint, taskLogger); + } } - String taskStatus = GlobusUtil.getTaskStatus(taskState); + String taskStatus = GlobusUtil.getCompletedTaskStatus(taskState); // ... plus log the outcome and send any notifications: if (taskStatus.startsWith("FAILED") || taskStatus.startsWith("INACTIVE")) { diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskInProgress.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskInProgress.java index 8644bca6143..f1bbd99fa67 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskInProgress.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskInProgress.java @@ -86,7 +86,8 @@ public String toString() { @JoinColumn private AuthenticatedUser user; - @Column(nullable=false) + // @Column(nullable=false) @todo we will need a flyway script in order to make + // this field nullable private String ruleId; @JoinColumn(nullable = false) diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java index 2a53df74ee4..4bce2b823a6 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java @@ -17,6 +17,8 @@ public class GlobusTaskState { private boolean skip_source_errors; private String nice_status; private String nice_status_short_description; + // @todo: add fields for other potentially useful things, like the + // human-friendly name associated with the Globus account, etc. public String getDestination_endpoint_display_name() { return destination_endpoint_display_name; diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java index fd36e2a27bc..aca1de3071e 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java @@ -40,7 +40,7 @@ public static boolean isTaskCompleted(GlobusTaskState task) { // that it's a Globus issue on the endnode side, that is // in fact recoverable; should we add it to the list here? // @todo: I'm tempted to just take "ACTIVE" for face value, - // and assume that it's still ongoing. + // and ALWAYS assume that it's still ongoing. if (task.getNice_status().equalsIgnoreCase("ok") || task.getNice_status().equalsIgnoreCase("queued")) { return false; @@ -60,11 +60,15 @@ public static boolean isTaskSucceeded(GlobusTaskState task) { if (status.equals("ACTIVE") || status.startsWith("FAILED") || status.startsWith("INACTIVE")) { // There are cases where a failed task may still be showing // as "ACTIVE". But it is definitely safe to assume that it - // has not completed *successfully*. + // has not completed *successfully*. (The key here is that + // this method is only called on tasks that have been determined + // to be completed for all practical purposes - which in + // some cases may include tasks still showing as "ACTIVE" + // in the Globus API output - L.A.) return false; } // @todo: should we be more careful here, and actually check for - // status.equalsI("SUCCEEDED") etc. before assuming the task + // status.equals("SUCCEEDED") etc. before assuming the task // did in fact succeed? return true; } @@ -75,7 +79,7 @@ public static boolean isTaskSucceeded(GlobusTaskState task) { * Produces a human-readable Status label of a completed task * @param GlobusTaskState task - a looked-up state of a task as reported by Globus API */ - public static String getTaskStatus(GlobusTaskState task) { + public static String getCompletedTaskStatus(GlobusTaskState task) { String status = null; if (task != null) { status = task.getStatus(); @@ -83,7 +87,7 @@ public static String getTaskStatus(GlobusTaskState task) { // The task is in progress but is not ok or queued // (L.A.) I think the assumption here is that this method is called // exclusively on tasks that have already completed. So that's why - // it is safe to assume that "ACTIVE" means "FAILED". + // the code below assumes that "ACTIVE" means "FAILED". if (status.equalsIgnoreCase("ACTIVE")) { status = "FAILED" + "#" + task.getNice_status() + "#" + task.getNice_status_short_description(); } else { diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java index 60e24d62702..27bcbaf92a0 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java @@ -85,7 +85,7 @@ public void checkOngoingUploadTasks() { GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), taskLogger); if (GlobusUtil.isTaskCompleted(retrieved)) { // Do our thing, finalize adding the files to the dataset - globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger); + globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getCompletedTaskStatus(retrieved), taskLogger); // Whether it finished successfully, or failed in the process, // there's no need to keep monitoring this task, so we can // delete it. @@ -116,7 +116,7 @@ public void checkOngoingDownloadTasks() { GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), taskLogger); if (GlobusUtil.isTaskCompleted(retrieved)) { - globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger); + globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getCompletedTaskStatus(retrieved), taskLogger); // globusService.processCompletedTask(t, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger); // Whether it finished successfully or failed, the task can now // be deleted.