Skip to content

Commit

Permalink
Merge pull request #73 from italiangrid/develop
Browse files Browse the repository at this point in the history
StoRM Backend 1.11.10 release to master
  • Loading branch information
enricovianello committed Jan 22, 2016
2 parents e1218bc + 4ffa389 commit 82c2688
Show file tree
Hide file tree
Showing 21 changed files with 831 additions and 721 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<name>StoRM Backend server</name>
<groupId>org.italiangrid.storm</groupId>
<artifactId>storm-backend-server</artifactId>
<version>1.11.9</version>
<version>1.11.10</version>

<properties>

Expand Down
213 changes: 7 additions & 206 deletions src/main/java/it/grid/storm/catalogs/PtPChunkCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@
import it.grid.storm.srm.types.TSpaceToken;
import it.grid.storm.srm.types.TStatusCode;
import it.grid.storm.srm.types.TTURL;
import it.grid.storm.synchcall.command.datatransfer.PutDoneCommand;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,33 +90,7 @@ public class PtPChunkCatalog {
*/
private PtPChunkCatalog() {

TimerTask transitTask = new TimerTask() {

@Override
public void run() {

List<Long> ids = getExpiredSRM_SPACE_AVAILABLE();

if (ids.isEmpty()) {
return;
}

Collection<ReducedPtPChunkData> reduced = lookupReducedPtPChunkData(ids
.toArray(new Long[ids.size()]));

if (reduced.isEmpty()) {
log.error("ATTENTION in PtP CHUNK CATALOG! Attempt to handle physical "
+ "files for transited expired entries failed! No data could be "
+ "translated from persitence for PtP Chunks with ID {}", ids);
}
ArrayList<TSURL> surls = new ArrayList<TSURL>(reduced.size());
for (ReducedPtPChunkData data : reduced) {
surls.add(data.toSURL());
}
PutDoneCommand.executePutDone(surls, null);
}
};
transiter.scheduleAtFixedRate(transitTask, delay, period);
transiter.scheduleAtFixedRate(new TransitExpiredPutRequestTimerTask(), delay, period);
}

/**
Expand Down Expand Up @@ -164,26 +135,6 @@ synchronized public void update(PtPPersistentChunkData chunkData) {
dao.update(to);
}

/**
* Method that synchronizes the supplied PtPChunkData with the information
* present in Persistence. BE WARNED: a new object is returned, and the
* original PtPChunkData is left untouched! null is returned in case of any
* error.
*/
synchronized public PtPPersistentChunkData refreshStatus(
PtPPersistentChunkData inputChunk) {

PtPChunkDataTO auxTO = dao.refresh(inputChunk.getPrimaryKey());
log.debug("PtP CHUNK CATALOG refreshStatus: retrieved data {}", auxTO);
if (auxTO == null) {
log.warn("PtP CHUNK CATALOG! Empty TO found in persistence for specified "
+ "request: {}", inputChunk.getPrimaryKey());
return null;
}
PtPPersistentChunkData data = makeOne(auxTO, inputChunk.getRequestToken());
return data;
}

/**
* Method that returns a Collection of PtPChunkData Objects matching the
* supplied TRequestToken. If any of the data associated to the TRequestToken
Expand Down Expand Up @@ -496,22 +447,6 @@ private boolean isComplete(ReducedPtPChunkDataTO reducedChunkTO) {
&& (reducedChunkTO.surlUniqueID() != null);
}

/**
* Method that returns a Collection of ReducedPtPChunkData Objects associated
* to the supplied TRequestToken. If any of the data retrieved for a given
* chunk is not well formed and so does not allow a ReducedPtPChunkData Object
* to be created, then that chunk is dropped and gets logged, while processing
* continues with the next one. All valid chunks get returned: the others get
* dropped. If there are no chunks associated to the given TRequestToken, then
* an empty Collection is returned and a messagge gets logged. All
* ReducedChunks, regardless of their status, are returned.
*/
synchronized public Collection<ReducedPtPChunkData> lookupReducedPtPChunkData(
TRequestToken rt) {

return lookupReducedPtPChunkData(rt, new ArrayList<TSURL>(0));
}

public Collection<ReducedPtPChunkData> lookupReducedPtPChunkData(
TRequestToken requestToken, Collection<TSURL> surls) {

Expand All @@ -521,11 +456,6 @@ public Collection<ReducedPtPChunkData> lookupReducedPtPChunkData(
return buildReducedChunkDataList(reducedChunkDataTOs);
}

public Collection<PtPPersistentChunkData> lookupPtPChunkData(TSURL surl) {

return lookupPtPChunkData((List<TSURL>) Arrays.asList(new TSURL[] { surl }));
}

public Collection<PtPPersistentChunkData> lookupPtPChunkData(TSURL surl,
GridUserInterface user) {

Expand All @@ -550,22 +480,6 @@ private Collection<PtPPersistentChunkData> lookupPtPChunkData(
return buildChunkDataList(chunkDataTOs);
}

public Collection<PtPPersistentChunkData> lookupPtPChunkData(List<TSURL> surls) {

int[] surlsUniqueIDs = new int[surls.size()];
String[] surlsArray = new String[surls.size()];
int index = 0;
for (TSURL tsurl : surls) {
surlsUniqueIDs[index] = tsurl.uniqueId();
surlsArray[index] = tsurl.rawSurl();
index++;
}
Collection<PtPChunkDataTO> chunkDataTOs = dao.find(surlsUniqueIDs,
surlsArray);
log.debug("PtP CHUNK CATALOG: retrieved data {}", chunkDataTOs);
return buildChunkDataList(chunkDataTOs);
}

private Collection<PtPPersistentChunkData> buildChunkDataList(
Collection<PtPChunkDataTO> chunkDataTOs) {

Expand Down Expand Up @@ -622,26 +536,6 @@ private Collection<ReducedPtPChunkData> buildReducedChunkDataList(
return list;
}

/**
* Method that returns a Collection of ReducedPtPChunkData Objects
* corresponding to each of the IDs else { log.debug(
* "PtPChunkDAO! No chunk of PtP request was transited from SRM_SPACE_AVAILABLE to SRM_FILE_LIFETIME_EXPIRED."
* ); }contained inthe supplied List of Long objects. If any of the data
* retrieved for a given chunk is not well formed and so does not allow a
* ReducedPtPChunkData Object to be created, then that chunk is dropped and
* gets logged, while processing continues with the next one. All valid chunks
* get returned: the others get dropped. WARNING! If there are no chunks
* associated to any of the given IDs, no messagge gets written in the logs!
*/
synchronized public Collection<ReducedPtPChunkData> lookupReducedPtPChunkData(
Long[] volids) {

Collection<ReducedPtPChunkDataTO> reducedChunkDataTOs = dao
.findReduced(Arrays.asList(volids));
log.debug("PtP CHUNK CATALOG: fetched data {}", reducedChunkDataTOs);
return buildReducedChunkDataList(reducedChunkDataTOs);
}

private ReducedPtPChunkData makeOneReduced(
ReducedPtPChunkDataTO reducedChunkDataTO) {

Expand Down Expand Up @@ -709,106 +603,22 @@ private ReducedPtPChunkData makeOneReduced(
return aux;
}

/**
* Method used to establish if in Persistence there is a PtPChunkData working
* on the supplied SURL, and whose state is SRM_SPACE_AVAILABLE, in which case
* true is returned. In case none are found or there is any problem, false is
* returned.
*/
synchronized public boolean isSRM_SPACE_AVAILABLE(TSURL surl) {

return (dao.numberInSRM_SPACE_AVAILABLE(surl.uniqueId()) > 0);
}

/**
* Method used to force transition to SRM_SUCCESS from SRM_SPACE_AVAILABLE, of
* all PtP Requests whose pinLifetime has expired and the state still has not
* been changed (a user forgot to run srmPutDone)! The method returns a List
* containing all ids of transited chunks that are also Volatile.
*/
synchronized public List<Long> getExpiredSRM_SPACE_AVAILABLE() {

return dao.getExpiredSRM_SPACE_AVAILABLE();
}

/**
* Method used to transit the specified Collection of ReducedPtPChunkData of
* the request identified by the supplied TRequestToken, from
* SRM_SPACE_AVAILABLE to SRM_SUCCESS. Chunks in any other starting state are
* not transited. <code>null</code> entries in the collection are permitted
* and skipped. In case of any error nothing is done, but proper error
* messages get logged.
*/
synchronized public void transitSRM_SPACE_AVAILABLEtoSRM_SUCCESS(
// Collection<ReducedPtPChunkData> chunks) {
TRequestToken token, List<TSURL> surls) {

if (token == null || surls == null || surls.size() == 0) {
return;
}
Collection<ReducedPtPChunkDataTO> tos = dao.findReduced(token.getValue(),
null);
LinkedList<Long> primaryKeys = new LinkedList<Long>();
for (ReducedPtPChunkDataTO to : tos) {
for (TSURL surl : surls) {
if (to.toSURL().equals(surl)) {
primaryKeys.add(to.primaryKey());
break;
}
}
}
dao.transitSRM_SPACE_AVAILABLEtoSRM_SUCCESS(primaryKeys);
}

/**
* This method is intended to be used by srmRm to transit all PtP chunks on
* the given SURL which are in the SRM_SPACE_AVAILABLE state, to SRM_ABORTED.
* The supplied String will be used as explanation in those chunks return
* status. The global status of the request is not changed. The TURL of those
* requests will automatically be set to empty. Notice that both
* removeAllJit(SURL) and removeVolatile(SURL) are automatically invoked on
* PinnedFilesCatalog, to remove any entry and corresponding physical ACLs.
* Beware, that the chunks may be part of requests that have finished, or that
* still have not finished because other chunks are being processed.
*/
synchronized public void transitSRM_SPACE_AVAILABLEtoSRM_ABORTED(TSURL surl,
String explanation) {

if (surl == null) {
log
.error("ATTENTION in PtP CHUNK CATALOG! Attempt to invoke transitSRM_SPACE_AVAILABLEtoSRM_ABORTED with a null SURL!");
return;
}
if (explanation == null) {
explanation = "";
}
dao.transitSRM_SPACE_AVAILABLEtoSRM_ABORTED(surl.uniqueId(),
surl.toString(), explanation);
}

public void updateStatus(TRequestToken requestToken, TSURL surl,
public int updateStatus(TRequestToken requestToken, TSURL surl,
TStatusCode statusCode, String explanation) {

dao.updateStatus(requestToken, new int[] { surl.uniqueId() },
return dao.updateStatus(requestToken, new int[] { surl.uniqueId() },
new String[] { surl.rawSurl() }, statusCode, explanation);
}

public void updateStatus(TSURL surl, TStatusCode statusCode,
String explanation) {

dao.updateStatus(new int[] { surl.uniqueId() },
new String[] { surl.rawSurl() }, statusCode, explanation);
}

public void updateFromPreviousStatus(TRequestToken requestToken,
public int updateFromPreviousStatus(TRequestToken requestToken,
TStatusCode expectedStatusCode, TStatusCode newStatusCode,
String explanation) {

dao.updateStatusOnMatchingStatus(requestToken, expectedStatusCode,
return dao.updateStatusOnMatchingStatus(requestToken, expectedStatusCode,
newStatusCode, explanation);
}

public void updateFromPreviousStatus(TRequestToken requestToken,
public int updateFromPreviousStatus(TRequestToken requestToken,
List<TSURL> surlList, TStatusCode expectedStatusCode,
TStatusCode newStatusCode) {

Expand All @@ -820,17 +630,8 @@ public void updateFromPreviousStatus(TRequestToken requestToken,
surls[index] = tsurl.rawSurl();
index++;
}
dao.updateStatusOnMatchingStatus(requestToken, surlsUniqueIDs, surls,
return dao.updateStatusOnMatchingStatus(requestToken, surlsUniqueIDs, surls,
expectedStatusCode, newStatusCode);
}

public void updateFromPreviousStatus(TSURL surl,
TStatusCode expectedStatusCode, TStatusCode newStatusCode,
String explanation) {

dao.updateStatusOnMatchingStatus(new int[] { surl.uniqueId() },
new String[] { surl.rawSurl() }, expectedStatusCode, newStatusCode,
explanation);
}

}
Loading

0 comments on commit 82c2688

Please sign in to comment.