From 57105ce3fd6a922ae5dc0fbce5285622de73cf43 Mon Sep 17 00:00:00 2001 From: enricovianello Date: Sun, 10 May 2020 12:27:14 +0200 Subject: [PATCH] Recall final status transitions handled more gracefully Fix for STOR-1177 --- .../tape/recalltable/TapeRecallCatalog.java | 8 +- .../tape/recalltable/resources/README.md | 44 ++-- .../recalltable/resources/TaskResource.java | 216 ++++++++++-------- .../resources/TaskResourceTest.java | 94 ++++++++ 4 files changed, 250 insertions(+), 112 deletions(-) diff --git a/src/main/java/it/grid/storm/tape/recalltable/TapeRecallCatalog.java b/src/main/java/it/grid/storm/tape/recalltable/TapeRecallCatalog.java index 648055c1..2d35dac6 100644 --- a/src/main/java/it/grid/storm/tape/recalltable/TapeRecallCatalog.java +++ b/src/main/java/it/grid/storm/tape/recalltable/TapeRecallCatalog.java @@ -64,13 +64,9 @@ public TapeRecallCatalog() { * * @param groupTaskId @param newValue */ - public void changeGroupTaskRetryValue(UUID groupTaskId, int newValue) { + public void changeGroupTaskRetryValue(UUID groupTaskId, int newValue) throws DataAccessException { - try { - tapeRecallDAO.setGroupTaskRetryValue(groupTaskId, newValue); - } catch (DataAccessException e) { - log.error("Unable to takeover a task", e); - } + tapeRecallDAO.setGroupTaskRetryValue(groupTaskId, newValue); } /** diff --git a/src/main/java/it/grid/storm/tape/recalltable/resources/README.md b/src/main/java/it/grid/storm/tape/recalltable/resources/README.md index 54169be9..6fe2be7d 100644 --- a/src/main/java/it/grid/storm/tape/recalltable/resources/README.md +++ b/src/main/java/it/grid/storm/tape/recalltable/resources/README.md @@ -5,18 +5,18 @@ This interface expose operations on recall tasks. Recall tasks have the following fields -- groupTaskId -- insertionInstant the time at which the task was created -- requestType the type of request that originated the recall task, either bol or ptg -- fileName the local path of the file to recall -- voName the virtual organization -- userID -- retryAttempt the number of times GEMMS will retry to recall the file in case of failure -- status the status of the recall task, either 0 (success), 1 (queued), 2 (in-progress), 3 (error), 4 (aborted), 5 (undefined) -- deferredRecallInstant -- pinLifetime -- requestToken the token of the request (the bol or ptg) that originated the recall task -- inProgressInstant the time at which the task status was changed to in progress +- `groupTaskId`, each recall task that points to the same resource (same `fileName`) has the same `groupTaskId`; +- `insertionInstant`, the time at which the task was created; +- `requestType`, the type of request that originated the recall task: bol or ptg; +- `fileName`, the local path of the file to recall; +- `voName`, the virtual organization name; +- `userID`, the DN of the user that has submitted the task; +- `retryAttempt`, the number of times GEMMS will retry to recall the file in case of failure; +- `status`, the status of the recall task (see the available values into [status table](#recall-task-statuses) in appendix; +- `deferredRecallInstant` +- `pinLifetime` +- `requestToken`, the token of the request (bol or ptg) that originated the recall task; +- `inProgressInstant`, the time at which the task status was changed to in progress. Recall tasks are transferred using a text/plain representation. Fields are separated by a tab, recall tasks are separated by a #, and all tasks are included by curly brackets as in @@ -165,9 +165,9 @@ and the corresponding response would be ## PUT /recalltable/task/{groupTaskId} -Update a recall task status. +Update status of all the recall tasks of the same resources, grouped by groupTaskId. -The status is passed in the body of the PUT request encoded as follows +The status is passed in the body of the PUT request encoded as follow: status=0 @@ -251,4 +251,18 @@ In your `storm.properties` set: ``` rest.token.enabled = true rest.token.value = your-secret-token -``` \ No newline at end of file +``` + +## Recall Task statuses + +Recall tasks status table: + +| code | status | +|:----:|:-----------:| +| 0 | success | +| 1 | queued | +| 2 | in-progress | +| 3 | error | +| 4 | aborted | +| 5 | undefined | + diff --git a/src/main/java/it/grid/storm/tape/recalltable/resources/TaskResource.java b/src/main/java/it/grid/storm/tape/recalltable/resources/TaskResource.java index 7a2196f6..701bfa47 100644 --- a/src/main/java/it/grid/storm/tape/recalltable/resources/TaskResource.java +++ b/src/main/java/it/grid/storm/tape/recalltable/resources/TaskResource.java @@ -19,40 +19,26 @@ package it.grid.storm.tape.recalltable.resources; import static it.grid.storm.persistence.model.TapeRecallTO.RecallTaskType.RCLL; +import static it.grid.storm.tape.recalltable.model.TapeRecallStatus.getRecallTaskStatus; +import static java.lang.String.format; import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE; import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static javax.ws.rs.core.Response.Status.NOT_FOUND; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import it.grid.storm.config.Configuration; -import it.grid.storm.namespace.NamespaceDirector; -import it.grid.storm.namespace.NamespaceException; -import it.grid.storm.namespace.NamespaceInterface; -import it.grid.storm.namespace.StoRI; -import it.grid.storm.persistence.exceptions.DataAccessException; -import it.grid.storm.persistence.model.TapeRecallTO; -import it.grid.storm.rest.metadata.service.ResourceNotFoundException; -import it.grid.storm.rest.metadata.service.ResourceService; -import it.grid.storm.tape.recalltable.TapeRecallCatalog; -import it.grid.storm.tape.recalltable.TapeRecallException; -import it.grid.storm.tape.recalltable.model.PutTapeRecallStatusLogic; -import it.grid.storm.tape.recalltable.model.PutTapeRecallStatusValidator; -import it.grid.storm.tape.recalltable.model.TapeRecallStatus; -import it.grid.storm.tape.recalltable.model.TaskInsertRequestValidator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; +import java.util.AbstractMap.SimpleEntry; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.ws.rs.Consumes; import javax.ws.rs.GET; @@ -65,6 +51,30 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.GenericEntity; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import it.grid.storm.config.Configuration; +import it.grid.storm.namespace.NamespaceDirector; +import it.grid.storm.namespace.NamespaceException; +import it.grid.storm.namespace.NamespaceInterface; +import it.grid.storm.namespace.StoRI; +import it.grid.storm.persistence.exceptions.DataAccessException; +import it.grid.storm.persistence.model.TapeRecallTO; +import it.grid.storm.rest.metadata.service.ResourceNotFoundException; +import it.grid.storm.rest.metadata.service.ResourceService; +import it.grid.storm.tape.recalltable.TapeRecallCatalog; +import it.grid.storm.tape.recalltable.TapeRecallException; +import it.grid.storm.tape.recalltable.model.PutTapeRecallStatusLogic; +import it.grid.storm.tape.recalltable.model.PutTapeRecallStatusValidator; +import it.grid.storm.tape.recalltable.model.TapeRecallStatus; +import it.grid.storm.tape.recalltable.model.TaskInsertRequestValidator; +import jersey.repackaged.com.google.common.collect.Lists; /** * @author Riccardo Zappi @@ -169,112 +179,94 @@ public Response putTaskStatus(InputStream input) { /** * Updates the status or retry value of a recall task. Called by GEMSS after a recall tasks is * finished. - * */ @PUT @Path("/{groupTaskId}") @Consumes("text/plain") - public void putNewTaskStatusOrRetryValue(@PathParam("groupTaskId") UUID groupTaskId, - InputStream input) throws TapeRecallException { + public Response putNewTaskStatusOrRetryValue(@PathParam("groupTaskId") UUID groupTaskId, + InputStream input) { + + log.debug("Requested to change recall table value for groupTaskId {}", groupTaskId); + + List lines; + try { + lines = getAllLines(input); + } catch (IOException e) { + String message = format("Unable to read entity lines: %s", e.getMessage()); + log.error(message, e); + return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.INTERNAL_SERVER_ERROR).build(); + } - log.debug("Requested to change recall table value for taskId {}", groupTaskId); + if (lines.isEmpty() || lines.size() > 1) { - String inputStr = buildInputString(input); + String message = format("Invalid body '%s'", lines); + return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.BAD_REQUEST).build(); + } - log.debug("@PUT (input string) = '{}'", inputStr); + Optional> keyValue = getKeyValue(lines.get(0)); - // Retrieve Tasks corresponding to taskId - // - the relationship between groupTaskId and entries within the DB is - // one-to-many + if (!keyValue.isPresent()) { - String errorStr = null; + String message = format("Invalid body '%s'", lines); + return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.BAD_REQUEST).build(); + } try { if (!recallCatalog.existsGroupTask(groupTaskId)) { - log.info( - "Received a tape recall status update but no Recall Group Task found with ID = '{}'", - groupTaskId); - - throw new TapeRecallException("No Recall Group Task found with ID = '" + groupTaskId + "'"); + String message = format("No Recall Group Task found with ID = '%s'", groupTaskId); + log.info(message); + return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.NOT_FOUND).build(); } } catch (DataAccessException e) { - log.error("Unable to retrieve Recall Group Task with ID = '{}' DataAccessException: {}", - groupTaskId, e.getMessage(), e); - - throw new TapeRecallException("Unable to retrieve recall group task " + "with ID = '" - + groupTaskId + "' " + e.getMessage()); + String message = format("Unable to retrieve Recall Group Task with ID = '%s' %s", groupTaskId, + e.getMessage()); + log.error(message, e); + return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.INTERNAL_SERVER_ERROR).build(); } - String keyRetryValue = config.getRetryValueKey(); String keyStatus = config.getStatusKey(); + String key = keyValue.get().getKey(); + Integer value = keyValue.get().getValue(); - int eqIndex = inputStr.indexOf('='); - - String value = null; - String key = null; - - if (eqIndex > 0) { - - value = inputStr.substring(eqIndex); - key = inputStr.substring(0, eqIndex); - - } else { - - errorStr = "Body '" + inputStr + "'is wrong"; - throw new TapeRecallException(errorStr); - } - - int intValue; - - try { - - // trim out the '\n' end. - intValue = Integer.valueOf(value.substring(1, value.length() - 1)); - - } catch (NumberFormatException e) { - - errorStr = "Unable to understand the number value = '" + value + "'"; - throw new TapeRecallException(errorStr); - } - - if (key.equals(keyRetryValue)) { // **** Set the Retry value + if (key.equalsIgnoreCase(keyStatus)) { // **** Set the Status - log.debug("Changing retry attempt of task {} to {}", groupTaskId, intValue); + log.debug("Changing status of task {} to {}", groupTaskId, value); - recallCatalog.changeGroupTaskRetryValue(groupTaskId, intValue); + try { - } else { + TapeRecallStatus newStatus = getRecallTaskStatus(value); + recallCatalog.changeGroupTaskStatus(groupTaskId, newStatus, new Date()); - if (key.equals(keyStatus)) { // **** Set the Status - log.debug("Changing status of task {} to {}", groupTaskId, intValue); + } catch (DataAccessException e) { - try { + String message = format( + "Unable to change the status for group task id %s to status %s DataAccessException : %s", + groupTaskId, value, e.getMessage()); + log.error(message, e); + return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.INTERNAL_SERVER_ERROR).build(); + } - recallCatalog.changeGroupTaskStatus(groupTaskId, - TapeRecallStatus.getRecallTaskStatus(intValue), new Date()); + } else { // inputMap.containsKey(keyRetryValue)) **** Set the Retry value - } catch (DataAccessException e) { + log.debug("Changing retry attempt of task {} to {}", groupTaskId, value); - log.error( - "Unable to change the status for group task id {} to status {} DataAccessException : {}", - groupTaskId, intValue, e.getMessage(), e); + try { - throw new TapeRecallException( - "Unable to change the status for group task id " + groupTaskId + " to status " - + intValue + " . DataAccessException : " + e.getMessage()); - } + recallCatalog.changeGroupTaskRetryValue(groupTaskId, value); - } else { + } catch (DataAccessException e) { - errorStr = "Unable to understand the key = '" + key + "' in @PUT request."; + String message = format("Unable to takeover a task: %s", e.getMessage()); + return Response.ok(message, TEXT_PLAIN_TYPE).status(Status.INTERNAL_SERVER_ERROR).build(); - throw new TapeRecallException(errorStr); } } + + return Response.noContent().build(); } /** @@ -451,4 +443,46 @@ private String buildInputString(InputStream input) { return sb.toString(); } + private Optional> getKeyValue(String line) { + + Pattern pStatus = Pattern.compile(config.getStatusKey() + "=[0-9]+"); + Pattern pAttempt = Pattern.compile(config.getRetryValueKey() + "=[0-9]+"); + + SimpleEntry entry = null; + + Matcher mStatus = pStatus.matcher(line); + Matcher mAttempt = pAttempt.matcher(line); + + if (mStatus.matches() || mAttempt.matches()) { + String key = line.split("=")[0]; + Integer value = Integer.valueOf(line.split("=")[1]); + entry = new SimpleEntry(key, value); + } else { + log.warn("got invalid key-value data = {}", line); + } + + return Optional.ofNullable(entry); + } + + + public List getAllLines(InputStream input) throws IOException { + + List lines = Lists.newArrayList(); + try (BufferedReader br = new BufferedReader(new InputStreamReader(input))) { + String line = null; + while ((line = br.readLine()) != null) { + lines.add(line); + } + } catch (IOException e) { + log.error(e.getMessage(), e); + } finally { + try { + input.close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + return lines; + } + } diff --git a/src/test/java/it/grid/storm/tape/recalltable/resources/TaskResourceTest.java b/src/test/java/it/grid/storm/tape/recalltable/resources/TaskResourceTest.java index e944e568..f8030e2d 100644 --- a/src/test/java/it/grid/storm/tape/recalltable/resources/TaskResourceTest.java +++ b/src/test/java/it/grid/storm/tape/recalltable/resources/TaskResourceTest.java @@ -6,6 +6,7 @@ import static javax.ws.rs.core.Response.Status.CREATED; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static javax.ws.rs.core.Response.Status.NO_CONTENT; import static javax.ws.rs.core.Response.Status.OK; import static junit.framework.Assert.assertNotNull; import static org.hamcrest.Matchers.equalTo; @@ -14,6 +15,7 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.util.ArrayList; import java.util.Date; @@ -23,6 +25,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; +import org.apache.commons.io.IOUtils; import org.junit.Test; import org.mockito.Mockito; @@ -389,4 +392,95 @@ public void testGETTasksInProgressEmpty() } + private TapeRecallCatalog getTapeRecallCatalogGroupTaskId(UUID groupTaskId, + boolean existsResponse) throws DataAccessException { + + TapeRecallCatalog catalog = Mockito.mock(TapeRecallCatalog.class); + Mockito.when(catalog.existsGroupTask(groupTaskId)).thenReturn(existsResponse); + return catalog; + } + + @Test + public void testPUTTaskStatusToSuccessWorks() + throws NamespaceException, ResourceNotFoundException, DataAccessException { + + final String BODY = "status=0"; + InputStream stubInputStream = IOUtils.toInputStream(BODY); + UUID groupTaskId = UUID.randomUUID(); + TaskResource recallEndpoint = getTaskResource(getResourceService(STORI), + getTapeRecallCatalogGroupTaskId(groupTaskId, true)); + Response res = recallEndpoint.putNewTaskStatusOrRetryValue(groupTaskId, stubInputStream); + assertThat(res.getStatus(), equalTo(NO_CONTENT.getStatusCode())); + } + + @Test + public void testPUTTaskRetryValueWorks() + throws NamespaceException, ResourceNotFoundException, DataAccessException { + + final String BODY = "retry-value=0"; + InputStream stubInputStream = IOUtils.toInputStream(BODY); + UUID groupTaskId = UUID.randomUUID(); + TaskResource recallEndpoint = getTaskResource(getResourceService(STORI), + getTapeRecallCatalogGroupTaskId(groupTaskId, true)); + Response res = recallEndpoint.putNewTaskStatusOrRetryValue(groupTaskId, stubInputStream); + assertThat(res.getStatus(), equalTo(NO_CONTENT.getStatusCode())); + } + + @Test + public void testPUTTaskStatusOnTaskIdNotFound() + throws NamespaceException, ResourceNotFoundException, DataAccessException { + + final String BODY = "status=0"; + InputStream stubInputStream = IOUtils.toInputStream(BODY); + UUID groupTaskId = UUID.randomUUID(); + TaskResource recallEndpoint = getTaskResource(getResourceService(STORI), + getTapeRecallCatalogGroupTaskId(groupTaskId, false)); + Response res = recallEndpoint.putNewTaskStatusOrRetryValue(groupTaskId, stubInputStream); + assertThat(res.getStatus(), equalTo(NOT_FOUND.getStatusCode())); + assertThat(res.getEntity().toString(), equalTo("No Recall Group Task found with ID = '" + groupTaskId + "'")); + } + + @Test + public void testPUTTaskStatusWithWrongKeyInBody() + throws NamespaceException, ResourceNotFoundException, DataAccessException { + + final String BODY = "wrong=0"; + InputStream stubInputStream = IOUtils.toInputStream(BODY); + UUID groupTaskId = UUID.randomUUID(); + TaskResource recallEndpoint = getTaskResource(getResourceService(STORI), + getTapeRecallCatalogGroupTaskId(groupTaskId, true)); + Response res = recallEndpoint.putNewTaskStatusOrRetryValue(groupTaskId, stubInputStream); + assertThat(res.getStatus(), equalTo(BAD_REQUEST.getStatusCode())); + assertThat(res.getEntity().toString(), equalTo("Invalid body '[" + BODY + "]'")); + } + + @Test + public void testPUTTaskStatusWithWrongValueInBody() + throws NamespaceException, ResourceNotFoundException, DataAccessException { + + final String BODY = "status=queued"; + InputStream stubInputStream = IOUtils.toInputStream(BODY); + UUID groupTaskId = UUID.randomUUID(); + TaskResource recallEndpoint = getTaskResource(getResourceService(STORI), + getTapeRecallCatalogGroupTaskId(groupTaskId, true)); + Response res = recallEndpoint.putNewTaskStatusOrRetryValue(groupTaskId, stubInputStream); + assertThat(res.getStatus(), equalTo(BAD_REQUEST.getStatusCode())); + assertThat(res.getEntity().toString(), equalTo("Invalid body '[" + BODY + "]'")); + } + + @Test + public void testPUTTaskStatusWithBothStatusAndRetryValuesInBody() + throws NamespaceException, ResourceNotFoundException, DataAccessException { + + final String BODY = "status=0\nretry-value=2"; + final String BODYSTR = "status=0, retry-value=2"; + InputStream stubInputStream = IOUtils.toInputStream(BODY); + UUID groupTaskId = UUID.randomUUID(); + TaskResource recallEndpoint = getTaskResource(getResourceService(STORI), + getTapeRecallCatalogGroupTaskId(groupTaskId, true)); + Response res = recallEndpoint.putNewTaskStatusOrRetryValue(groupTaskId, stubInputStream); + assertThat(res.getStatus(), equalTo(BAD_REQUEST.getStatusCode())); + assertThat(res.getEntity().toString(), equalTo("Invalid body '[" + BODYSTR + "]'")); + + } }