Skip to content

Commit

Permalink
Recall final status transitions handled more gracefully
Browse files Browse the repository at this point in the history
Fix for STOR-1177
  • Loading branch information
enricovianello committed May 11, 2020
1 parent eb16915 commit 57105ce
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,9 @@ public TapeRecallCatalog() {
*
* @param groupTaskId @param newValue
*/
public void changeGroupTaskRetryValue(UUID groupTaskId, int newValue) {
public void changeGroupTaskRetryValue(UUID groupTaskId, int newValue) throws DataAccessException {

try {
tapeRecallDAO.setGroupTaskRetryValue(groupTaskId, newValue);
} catch (DataAccessException e) {
log.error("Unable to takeover a task", e);
}
tapeRecallDAO.setGroupTaskRetryValue(groupTaskId, newValue);
}

/**
Expand Down
44 changes: 29 additions & 15 deletions src/main/java/it/grid/storm/tape/recalltable/resources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ This interface expose operations on recall tasks.

Recall tasks have the following fields

- groupTaskId
- insertionInstant the time at which the task was created
- requestType the type of request that originated the recall task, either bol or ptg
- fileName the local path of the file to recall
- voName the virtual organization
- userID
- retryAttempt the number of times GEMMS will retry to recall the file in case of failure
- status the status of the recall task, either 0 (success), 1 (queued), 2 (in-progress), 3 (error), 4 (aborted), 5 (undefined)
- deferredRecallInstant
- pinLifetime
- requestToken the token of the request (the bol or ptg) that originated the recall task
- inProgressInstant the time at which the task status was changed to in progress
- `groupTaskId`, each recall task that points to the same resource (same `fileName`) has the same `groupTaskId`;
- `insertionInstant`, the time at which the task was created;
- `requestType`, the type of request that originated the recall task: bol or ptg;
- `fileName`, the local path of the file to recall;
- `voName`, the virtual organization name;
- `userID`, the DN of the user that has submitted the task;
- `retryAttempt`, the number of times GEMMS will retry to recall the file in case of failure;
- `status`, the status of the recall task (see the available values into [status table](#recall-task-statuses) in appendix;
- `deferredRecallInstant`
- `pinLifetime`
- `requestToken`, the token of the request (bol or ptg) that originated the recall task;
- `inProgressInstant`, the time at which the task status was changed to in progress.

Recall tasks are transferred using a text/plain representation. Fields are separated by a tab, recall tasks are separated by a #, and all tasks are included by curly brackets as in

Expand Down Expand Up @@ -165,9 +165,9 @@ and the corresponding response would be

## PUT /recalltable/task/{groupTaskId}

Update a recall task status.
Update status of all the recall tasks of the same resources, grouped by groupTaskId.

The status is passed in the body of the PUT request encoded as follows
The status is passed in the body of the PUT request encoded as follow:

status=0

Expand Down Expand Up @@ -251,4 +251,18 @@ In your `storm.properties` set:
```
rest.token.enabled = true
rest.token.value = your-secret-token
```
```

## Recall Task statuses

Recall tasks status table:

| code | status |
|:----:|:-----------:|
| 0 | success |
| 1 | queued |
| 2 | in-progress |
| 3 | error |
| 4 | aborted |
| 5 | undefined |

216 changes: 125 additions & 91 deletions src/main/java/it/grid/storm/tape/recalltable/resources/TaskResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,26 @@
package it.grid.storm.tape.recalltable.resources;

import static it.grid.storm.persistence.model.TapeRecallTO.RecallTaskType.RCLL;
import static it.grid.storm.tape.recalltable.model.TapeRecallStatus.getRecallTaskStatus;
import static java.lang.String.format;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import it.grid.storm.config.Configuration;
import it.grid.storm.namespace.NamespaceDirector;
import it.grid.storm.namespace.NamespaceException;
import it.grid.storm.namespace.NamespaceInterface;
import it.grid.storm.namespace.StoRI;
import it.grid.storm.persistence.exceptions.DataAccessException;
import it.grid.storm.persistence.model.TapeRecallTO;
import it.grid.storm.rest.metadata.service.ResourceNotFoundException;
import it.grid.storm.rest.metadata.service.ResourceService;
import it.grid.storm.tape.recalltable.TapeRecallCatalog;
import it.grid.storm.tape.recalltable.TapeRecallException;
import it.grid.storm.tape.recalltable.model.PutTapeRecallStatusLogic;
import it.grid.storm.tape.recalltable.model.PutTapeRecallStatusValidator;
import it.grid.storm.tape.recalltable.model.TapeRecallStatus;
import it.grid.storm.tape.recalltable.model.TaskInsertRequestValidator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.AbstractMap.SimpleEntry;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
Expand All @@ -65,6 +51,30 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import it.grid.storm.config.Configuration;
import it.grid.storm.namespace.NamespaceDirector;
import it.grid.storm.namespace.NamespaceException;
import it.grid.storm.namespace.NamespaceInterface;
import it.grid.storm.namespace.StoRI;
import it.grid.storm.persistence.exceptions.DataAccessException;
import it.grid.storm.persistence.model.TapeRecallTO;
import it.grid.storm.rest.metadata.service.ResourceNotFoundException;
import it.grid.storm.rest.metadata.service.ResourceService;
import it.grid.storm.tape.recalltable.TapeRecallCatalog;
import it.grid.storm.tape.recalltable.TapeRecallException;
import it.grid.storm.tape.recalltable.model.PutTapeRecallStatusLogic;
import it.grid.storm.tape.recalltable.model.PutTapeRecallStatusValidator;
import it.grid.storm.tape.recalltable.model.TapeRecallStatus;
import it.grid.storm.tape.recalltable.model.TaskInsertRequestValidator;
import jersey.repackaged.com.google.common.collect.Lists;

/**
* @author Riccardo Zappi
Expand Down Expand Up @@ -169,112 +179,94 @@ public Response putTaskStatus(InputStream input) {
/**
* Updates the status or retry value of a recall task. Called by GEMSS after a recall tasks is
* finished.
*
*/
@PUT
@Path("/{groupTaskId}")
@Consumes("text/plain")
public void putNewTaskStatusOrRetryValue(@PathParam("groupTaskId") UUID groupTaskId,
InputStream input) throws TapeRecallException {
public Response putNewTaskStatusOrRetryValue(@PathParam("groupTaskId") UUID groupTaskId,
InputStream input) {

log.debug("Requested to change recall table value for groupTaskId {}", groupTaskId);

List<String> lines;
try {
lines = getAllLines(input);
} catch (IOException e) {
String message = format("Unable to read entity lines: %s", e.getMessage());
log.error(message, e);
return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.INTERNAL_SERVER_ERROR).build();
}

log.debug("Requested to change recall table value for taskId {}", groupTaskId);
if (lines.isEmpty() || lines.size() > 1) {

String inputStr = buildInputString(input);
String message = format("Invalid body '%s'", lines);
return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.BAD_REQUEST).build();
}

log.debug("@PUT (input string) = '{}'", inputStr);
Optional<SimpleEntry<String, Integer>> keyValue = getKeyValue(lines.get(0));

// Retrieve Tasks corresponding to taskId
// - the relationship between groupTaskId and entries within the DB is
// one-to-many
if (!keyValue.isPresent()) {

String errorStr = null;
String message = format("Invalid body '%s'", lines);
return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.BAD_REQUEST).build();
}

try {

if (!recallCatalog.existsGroupTask(groupTaskId)) {

log.info(
"Received a tape recall status update but no Recall Group Task found with ID = '{}'",
groupTaskId);

throw new TapeRecallException("No Recall Group Task found with ID = '" + groupTaskId + "'");
String message = format("No Recall Group Task found with ID = '%s'", groupTaskId);
log.info(message);
return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.NOT_FOUND).build();
}

} catch (DataAccessException e) {

log.error("Unable to retrieve Recall Group Task with ID = '{}' DataAccessException: {}",
groupTaskId, e.getMessage(), e);

throw new TapeRecallException("Unable to retrieve recall group task " + "with ID = '"
+ groupTaskId + "' " + e.getMessage());
String message = format("Unable to retrieve Recall Group Task with ID = '%s' %s", groupTaskId,
e.getMessage());
log.error(message, e);
return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.INTERNAL_SERVER_ERROR).build();
}

String keyRetryValue = config.getRetryValueKey();
String keyStatus = config.getStatusKey();
String key = keyValue.get().getKey();
Integer value = keyValue.get().getValue();

int eqIndex = inputStr.indexOf('=');

String value = null;
String key = null;

if (eqIndex > 0) {

value = inputStr.substring(eqIndex);
key = inputStr.substring(0, eqIndex);

} else {

errorStr = "Body '" + inputStr + "'is wrong";
throw new TapeRecallException(errorStr);
}

int intValue;

try {

// trim out the '\n' end.
intValue = Integer.valueOf(value.substring(1, value.length() - 1));

} catch (NumberFormatException e) {

errorStr = "Unable to understand the number value = '" + value + "'";
throw new TapeRecallException(errorStr);
}

if (key.equals(keyRetryValue)) { // **** Set the Retry value
if (key.equalsIgnoreCase(keyStatus)) { // **** Set the Status

log.debug("Changing retry attempt of task {} to {}", groupTaskId, intValue);
log.debug("Changing status of task {} to {}", groupTaskId, value);

recallCatalog.changeGroupTaskRetryValue(groupTaskId, intValue);
try {

} else {
TapeRecallStatus newStatus = getRecallTaskStatus(value);
recallCatalog.changeGroupTaskStatus(groupTaskId, newStatus, new Date());

if (key.equals(keyStatus)) { // **** Set the Status
log.debug("Changing status of task {} to {}", groupTaskId, intValue);
} catch (DataAccessException e) {

try {
String message = format(
"Unable to change the status for group task id %s to status %s DataAccessException : %s",
groupTaskId, value, e.getMessage());
log.error(message, e);
return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.INTERNAL_SERVER_ERROR).build();
}

recallCatalog.changeGroupTaskStatus(groupTaskId,
TapeRecallStatus.getRecallTaskStatus(intValue), new Date());
} else { // inputMap.containsKey(keyRetryValue)) **** Set the Retry value

} catch (DataAccessException e) {
log.debug("Changing retry attempt of task {} to {}", groupTaskId, value);

log.error(
"Unable to change the status for group task id {} to status {} DataAccessException : {}",
groupTaskId, intValue, e.getMessage(), e);
try {

throw new TapeRecallException(
"Unable to change the status for group task id " + groupTaskId + " to status "
+ intValue + " . DataAccessException : " + e.getMessage());
}
recallCatalog.changeGroupTaskRetryValue(groupTaskId, value);

} else {
} catch (DataAccessException e) {

errorStr = "Unable to understand the key = '" + key + "' in @PUT request.";
String message = format("Unable to takeover a task: %s", e.getMessage());
return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.INTERNAL_SERVER_ERROR).build();

throw new TapeRecallException(errorStr);
}
}

return Response.noContent().build();
}

/**
Expand Down Expand Up @@ -451,4 +443,46 @@ private String buildInputString(InputStream input) {
return sb.toString();
}

private Optional<SimpleEntry<String, Integer>> getKeyValue(String line) {

Pattern pStatus = Pattern.compile(config.getStatusKey() + "=[0-9]+");
Pattern pAttempt = Pattern.compile(config.getRetryValueKey() + "=[0-9]+");

SimpleEntry<String, Integer> entry = null;

Matcher mStatus = pStatus.matcher(line);
Matcher mAttempt = pAttempt.matcher(line);

if (mStatus.matches() || mAttempt.matches()) {
String key = line.split("=")[0];
Integer value = Integer.valueOf(line.split("=")[1]);
entry = new SimpleEntry<String, Integer>(key, value);
} else {
log.warn("got invalid key-value data = {}", line);
}

return Optional.ofNullable(entry);
}


public List<String> getAllLines(InputStream input) throws IOException {

List<String> lines = Lists.newArrayList();
try (BufferedReader br = new BufferedReader(new InputStreamReader(input))) {
String line = null;
while ((line = br.readLine()) != null) {
lines.add(line);
}
} catch (IOException e) {
log.error(e.getMessage(), e);
} finally {
try {
input.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
return lines;
}

}
Loading

0 comments on commit 57105ce

Please sign in to comment.