diff --git a/pom.xml b/pom.xml
index d8705e6..6866d77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
4.0.0
DVUploader
DVUploader
- 1.0.9
+ 1.1.0
UTF-8
diff --git a/src/main/java/org/sead/uploader/AbstractUploader.java b/src/main/java/org/sead/uploader/AbstractUploader.java
index 176b6cd..593ab60 100644
--- a/src/main/java/org/sead/uploader/AbstractUploader.java
+++ b/src/main/java/org/sead/uploader/AbstractUploader.java
@@ -22,6 +22,7 @@
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URL;
+import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -104,6 +105,13 @@ public static void println(String s) {
}
return;
}
+ static DecimalFormat decimalFormat = new DecimalFormat("#.00");
+
+ public static void printStatus(float s) {
+ System.out.print("\rProgress: " + decimalFormat.format(s*100) + "%");
+ System.out.flush();
+ return;
+ }
public void parseArgs(String[] args) {
diff --git a/src/main/java/org/sead/uploader/dataverse/DVUploader.java b/src/main/java/org/sead/uploader/dataverse/DVUploader.java
index 02b77af..201e6c4 100644
--- a/src/main/java/org/sead/uploader/dataverse/DVUploader.java
+++ b/src/main/java/org/sead/uploader/dataverse/DVUploader.java
@@ -23,6 +23,10 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -31,30 +35,34 @@
import org.apache.commons.codec.binary.Hex;
import org.apache.http.HttpEntity;
-import org.apache.http.HttpHeaders;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.ContentBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import org.sead.uploader.AbstractUploader;
+import static org.sead.uploader.AbstractUploader.println;
import org.sead.uploader.util.UploaderException;
import org.sead.uploader.util.Resource;
@@ -73,6 +81,22 @@ public class DVUploader extends AbstractUploader {
private static boolean directUpload = false;
private static boolean trustCerts = false;
+ private int timeout = 1200;
+ private int httpConcurrency = 4;
+
+ private static long mpSizeLimit = 5 * 1024 * 1024;
+ private RequestConfig config = RequestConfig.custom()
+ .setConnectTimeout(timeout * 1000)
+ .setConnectionRequestTimeout(timeout * 1000)
+ .setSocketTimeout(timeout * 1000)
+ .setCookieSpec(CookieSpecs.STANDARD)
+ .setExpectContinueEnabled(true)
+ .build();
+
+ private static HttpClientContext localContext = HttpClientContext.create();
+
+ private PoolingHttpClientConnectionManager cm = null;
+
public static void main(String args[]) throws Exception {
setUploader(new DVUploader());
@@ -100,7 +124,7 @@ public static void main(String args[]) throws Exception {
private static void usage() {
println("\nUsage:");
- println(" java -jar DVUploader-1.0.1.jar -server= -key= -did= ");
+ println(" java -jar DVUploader-1.1.0.jar -server= -key= -did= ");
println("\n where:");
println(" = the URL of the server to upload to, e.g. https://datverse.tdl.org");
@@ -109,6 +133,7 @@ private static void usage() {
println(" = a space separated list of files to upload or directory name(s) where the files to upload are");
println("\n Optional Arguments:");
println(" -directupload - Use Dataverse's direct upload capability to send files directly to their final location (only works if this is enabled on the server)");
+ println(" -mpsize - with direct upload, this is the max size before switching to multipart uploads (Note actual part size is server controlled)");
println(" -listonly - Scan the Dataset and local files and list what would be uploaded (does not upload with this flag)");
println(" -limit= - Specify a maximum number of files to upload per invocation.");
println(" -verify - Check both the file name and checksum in comparing with current Dataset entries.");
@@ -139,6 +164,14 @@ public boolean parseCustomArg(String arg) {
directUpload = true;
println("Will use direct upload of files (if configured on this server)");
return true;
+ } else if (arg.startsWith("-mpsize")) {
+ try {
+ mpSizeLimit = Long.parseLong(arg.substring(arg.indexOf(argSeparator) + 1));
+ } catch (NumberFormatException nfe) {
+ println("Unable to parse -mpsize as long, using : " + mpSizeLimit);
+ }
+ println("Will use multipart upload for direct upload files over " + mpSizeLimit + " bytes");
+ return true;
} else if (arg.equals("-trustall")) {
trustCerts = true;
println("Will trust all certificates");
@@ -162,31 +195,12 @@ public HttpClientContext authenticate() {
public CloseableHttpClient getSharedHttpClient() {
if (httpclient == null) {
- // use the TrustSelfSignedStrategy to allow Self Signed Certificates
- SSLContext sslContext;
- SSLConnectionSocketFactory connectionFactory;
try {
- if (trustCerts) {
- sslContext = SSLContextBuilder
- .create()
- .loadTrustMaterial(new TrustAllStrategy())
- .build();
- // create an SSL Socket Factory to use the SSLContext with the trust self signed certificate strategy
- // and allow all hosts verifier.
- connectionFactory = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);
- // finally create the HttpClient using HttpClient factory methods and assign the ssl socket factory
- httpclient = HttpClients
- .custom()
- .setSSLSocketFactory(connectionFactory)
- .setDefaultRequestConfig(RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).setExpectContinueEnabled(true).build())
- .build();
- } else {
- httpclient = HttpClients
- .custom()
- .setDefaultRequestConfig(RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).setExpectContinueEnabled(true).build())
- .build();
-
- }
+ initHttpPool();
+ httpclient = HttpClients.custom()
+ .setConnectionManager(cm)
+ .setDefaultRequestConfig(config)
+ .build();
} catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException ex) {
Logger.getLogger(DVUploader.class.getName()).log(Level.SEVERE, null, ex);
@@ -388,172 +402,18 @@ protected String uploadDatafile(Resource file, String path) {
httpclient = getSharedHttpClient();
}
String dataId = null;
- int retry = 10;
+ int retries = 10;
if (directUpload) {
- while (retry > 0) {
-
- try {
-
- // Now post data
- String urlString = server + "/api/datasets/:persistentId/uploadsid";
- urlString = urlString + "?persistentId=doi:" + datasetPID.substring(4) + "&key=" + apiKey;
- HttpGet httpget = new HttpGet(urlString);
- CloseableHttpResponse response = httpclient.execute(httpget, getLocalContext());
- try {
- int status = response.getStatusLine().getStatusCode();
- String uploadUrl = null;
- String jsonResponse = null;
- HttpEntity resEntity = response.getEntity();
- if (resEntity != null) {
- jsonResponse = EntityUtils.toString(resEntity);
- }
- if (status == 200) {
- JSONObject data = (new JSONObject(jsonResponse)).getJSONObject("data");
- uploadUrl = data.getString("url");
- String storageIdentifier = data.getString("storageIdentifier");
-
- HttpPut httpput = new HttpPut(uploadUrl);
-
-
- httpput.addHeader("x-amz-tagging", "dv-state=temp");
- MessageDigest messageDigest = MessageDigest.getInstance("MD5");
-
- try (InputStream inStream = file.getInputStream(); DigestInputStream digestInputStream = new DigestInputStream(inStream, messageDigest)) {
- // This is hte new form for requests - keeping the example but won't update until we can change all
- //HttpUriRequest httpput = RequestBuilder.put()
- // .setUri(uploadUrl)
- // .setHeader("x-amz-tagging", "dv-state=temp")
- // .setEntity(new InputStreamEntity(digestInputStream, file.length()))
- // .build();
- httpput.setEntity(new InputStreamEntity(digestInputStream, file.length()));
- CloseableHttpResponse putResponse = httpclient.execute(httpput);
- try {
- int putStatus = putResponse.getStatusLine().getStatusCode();
- String putRes = null;
- HttpEntity putEntity = putResponse.getEntity();
- if (putEntity != null) {
- putRes = EntityUtils.toString(putEntity);
- }
- if (putStatus == 200) {
- String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest());
- // Now post data
- urlString = server + "/api/datasets/:persistentId/add";
- urlString = urlString + "?persistentId=" + datasetPID + "&key=" + apiKey;
- HttpPost httppost = new HttpPost(urlString);
-
- // ContentBody bin = file.getContentBody();
- MultipartEntityBuilder meb = MultipartEntityBuilder.create();
- // meb.addPart("file", bin);
- String jsonData = "{\"storageIdentifier\":\"" + storageIdentifier + "\",\"fileName\":\""
- + file.getName() + "\",\"mimeType\":\"" + file.getMimeType() + "\",\"md5Hash\":\"" + localchecksum + "\",\"fileSize\":\"" + file.length() + "\"";
- if (recurse) {
- // Dataverse takes paths without an initial / and ending without a /
- // with the path not including the file name
- if (path.substring(1).contains("/")) {
- String parentPath = path.substring(1, path.lastIndexOf("/"));
- jsonData = jsonData
- + (!parentPath.isEmpty() ? ",\"directoryLabel\":\"" + parentPath + "\"}"
- : "}");
-
- }
- } else {
- jsonData = jsonData + "}";
- }
- meb.addTextBody("jsonData", jsonData);
-
- HttpEntity reqEntity = meb.build();
- httppost.setEntity(reqEntity);
-
- CloseableHttpResponse postResponse = httpclient.execute(httppost, getLocalContext());
- try {
- int postStatus = postResponse.getStatusLine().getStatusCode();
- String postRes = null;
- HttpEntity postEntity = postResponse.getEntity();
- if (postEntity != null) {
- postRes = EntityUtils.toString(postEntity);
- }
- if (postStatus == 200) {
- JSONObject checksum = (new JSONObject(postRes)).getJSONObject("data")
- .getJSONArray("files").getJSONObject(0).getJSONObject("dataFile")
- .getJSONObject("checksum");
- dataId = checksum.getString("type") + ":" + checksum.getString("value");
- retry = 0;
- int total = 0;
- // For new servers, wait up to maxWaitTime for a dataset lock to expire.
- while (isLocked() && (total < maxWaitTime)) {
- TimeUnit.SECONDS.sleep(1);
- total = total + 1;
- }
- } else if (status == 400 && oldServer) {
- // If the call to the lock API fails in isLocked(), oldServer will be set to
- // true and
- // all we can do for a lock is to keep retrying.
- // Unfortunately, the error messages are configurable, so there's no guaranteed
- // way to detect
- // locks versus other conditions (e.g. file exists), so we can test for unique
- // words in the default messages
- if ((postRes != null) && postRes.contains("lock")) {
- retry--;
- } else {
- println("Error response when processing " + file.getAbsolutePath() + " : "
- + response.getStatusLine().getReasonPhrase());
- // A real error: e.g. This file already exists in the dataset.
- if (postRes != null) {
- println(postRes);
- }
- // Skip
- retry = 0;
- }
- } else {
- // An error and unlikely that we can recover, so report and move on.
- println("Error response when processing " + file.getAbsolutePath() + " : "
- + response.getStatusLine().getReasonPhrase());
- if (postRes != null) {
- println(postRes);
- }
- retry = 0;
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- retry = 0;
- } finally {
- EntityUtils.consumeQuietly(response.getEntity());
- }
- } else {
- println("Upload failed with status: " + putStatus + " (skipping)");
- retry=0;
- }
-
- } catch (IOException e) {
- e.printStackTrace(System.out);
- println("Error processing POST to Dataverse" + file.getAbsolutePath() + " : " + e.getMessage());
- retry = 0;
- }
- }
- } else {
- if (status > 500) {
- retry--;
- } else {
- retry = 0;
- }
- }
-
- } catch (IOException e) {
- e.printStackTrace(System.out);
- println("Error processing file upload " + file.getAbsolutePath() + " : " + e.getMessage());
- retry = 0;
- } catch (NoSuchAlgorithmException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
-
- } catch (IOException e) {
- println("Error processing request for storage id" + file.getAbsolutePath() + " : " + e.getMessage());
- retry = 0;
- }
+ try {
+//@Deprecated -used in v4.20 dataId = directFileUpload(file, path, retries);
+ dataId = multipartDirectFileUpload(file, path, retries);
+ } catch (IOException e) {
+ println("Error processing request for storage id" + file.getAbsolutePath() + " : " + e.getMessage());
+ retries = 0;
}
+
} else {
- while (retry > 0) {
+ while (retries > 0) {
try {
// Now post data
@@ -589,7 +449,7 @@ protected String uploadDatafile(Resource file, String path) {
JSONObject checksum = (new JSONObject(res)).getJSONObject("data").getJSONArray("files")
.getJSONObject(0).getJSONObject("dataFile").getJSONObject("checksum");
dataId = checksum.getString("type") + ":" + checksum.getString("value");
- retry = 0;
+ retries = 0;
int total = 0;
// For new servers, wait up to maxWaitTime for a dataset lock to expire.
while (isLocked() && (total < maxWaitTime)) {
@@ -605,7 +465,7 @@ protected String uploadDatafile(Resource file, String path) {
// locks versus other conditions (e.g. file exists), so we can test for unique
// words in the default messages
if ((res != null) && res.contains("lock")) {
- retry--;
+ retries--;
} else {
println("Error response when processing " + file.getAbsolutePath() + " : "
+ response.getStatusLine().getReasonPhrase());
@@ -614,7 +474,7 @@ protected String uploadDatafile(Resource file, String path) {
println(res);
}
// Skip
- retry = 0;
+ retries = 0;
}
} else {
// An error and unlikely that we can recover, so report and move on.
@@ -623,18 +483,18 @@ protected String uploadDatafile(Resource file, String path) {
if (res != null) {
println(res);
}
- retry = 0;
+ retries = 0;
}
} catch (InterruptedException e) {
e.printStackTrace();
- retry = 0;
+ retries = 0;
} finally {
EntityUtils.consumeQuietly(response.getEntity());
}
} catch (IOException e) {
println("Error processing " + file.getAbsolutePath() + " : " + e.getMessage());
- retry = 0;
+ retries = 0;
}
}
}
@@ -678,4 +538,379 @@ private boolean isLocked() {
}
return false;
}
+
+ private String directFileUpload(Resource file, String path, int retries) throws IOException {
+ String dataId = null;
+ while (retries > 0) {
+ // Now post data
+ String urlString = server + "/api/datasets/:persistentId/uploadsid";
+ urlString = urlString + "?persistentId=doi:" + datasetPID.substring(4) + "&key=" + apiKey;
+ HttpGet httpget = new HttpGet(urlString);
+ CloseableHttpResponse response = httpclient.execute(httpget, getLocalContext());
+ try {
+ int status = response.getStatusLine().getStatusCode();
+ String uploadUrl = null;
+ String jsonResponse = null;
+ HttpEntity resEntity = response.getEntity();
+ if (resEntity != null) {
+ jsonResponse = EntityUtils.toString(resEntity);
+ }
+ if (status == 200) {
+ JSONObject data = (new JSONObject(jsonResponse)).getJSONObject("data");
+ uploadUrl = data.getString("url");
+ String storageIdentifier = data.getString("storageIdentifier");
+
+ HttpPut httpput = new HttpPut(uploadUrl);
+
+ httpput.addHeader("x-amz-tagging", "dv-state=temp");
+ MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+
+ try (InputStream inStream = file.getInputStream(); DigestInputStream digestInputStream = new DigestInputStream(inStream, messageDigest)) {
+ // This is hte new form for requests - keeping the example but won't update until we can change all
+ //HttpUriRequest httpput = RequestBuilder.put()
+ // .setUri(uploadUrl)
+ // .setHeader("x-amz-tagging", "dv-state=temp")
+ // .setEntity(new InputStreamEntity(digestInputStream, file.length()))
+ // .build();
+ httpput.setEntity(new InputStreamEntity(digestInputStream, file.length()));
+ CloseableHttpResponse putResponse = httpclient.execute(httpput);
+ try {
+ int putStatus = putResponse.getStatusLine().getStatusCode();
+ String putRes = null;
+ HttpEntity putEntity = putResponse.getEntity();
+ if (putEntity != null) {
+ putRes = EntityUtils.toString(putEntity);
+ }
+ if (putStatus == 200) {
+ String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest());
+ dataId = registerFileWithDataverse(file, path, storageIdentifier, localchecksum, retries);
+ retries=0;
+ }
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ println("Error processing POST to Dataverse" + file.getAbsolutePath() + " : " + e.getMessage());
+ retries = 0;
+ }
+ }
+ } else {
+ if (status > 500) {
+ retries--;
+ } else {
+ retries = 0;
+ }
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ println("Error processing file upload " + file.getAbsolutePath() + " : " + e.getMessage());
+ retries = 0;
+ } catch (NoSuchAlgorithmException e1) {
+ println("Checksum Algoritm not found: " + e1.getLocalizedMessage());
+ }
+ }
+ return dataId;
+ }
+
+ private String multipartDirectFileUpload(Resource file, String path, int retries) throws IOException {
+ String dataId = null;
+ while (retries > 0) {
+ // Start multipart upload with a call to Dataverse. It will make a call to S3 to start the multipart upload and will return a set of presigned Urls for us to upload the parts
+ String urlString = server + "/api/datasets/:persistentId/uploadurls";
+ urlString = urlString + "?persistentId=doi:" + datasetPID.substring(4) + "&key=" + apiKey + "&size=" + file.length();
+ HttpGet httpget = new HttpGet(urlString);
+ CloseableHttpResponse response = httpclient.execute(httpget, getLocalContext());
+ try {
+ int status = response.getStatusLine().getStatusCode();
+
+ String jsonResponse = null;
+ HttpEntity resEntity = response.getEntity();
+ if (resEntity != null) {
+ jsonResponse = EntityUtils.toString(resEntity);
+ }
+ if (status == 200) {
+ println(jsonResponse);
+ JSONObject uploadResponse = (new JSONObject(jsonResponse)).getJSONObject("data");
+ //Along with the parts, which should be listed numerically, we get convenience URLs to call on Dataverse to abort or complete the multipart upload
+ //backwards compat in testing
+ long maxPartSize = 5 * 1024 * 1024l;
+ if (uploadResponse.has("partSize")) {
+ maxPartSize = uploadResponse.getLong("partSize");
+ }
+ if (uploadResponse.has("url")) {
+ String storageIdentifier = uploadResponse.getString("storageIdentifier");
+ String uploadUrl = uploadResponse.getString("url");
+ HttpPut httpput = new HttpPut(uploadUrl);
+
+ httpput.addHeader("x-amz-tagging", "dv-state=temp");
+ try {
+ MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+
+ try (InputStream inStream = file.getInputStream(); DigestInputStream digestInputStream = new DigestInputStream(inStream, messageDigest)) {
+ // This is hte new form for requests - keeping the example but won't update until we can change all
+ //HttpUriRequest httpput = RequestBuilder.put()
+ // .setUri(uploadUrl)
+ // .setHeader("x-amz-tagging", "dv-state=temp")
+ // .setEntity(new InputStreamEntity(digestInputStream, file.length()))
+ // .build();
+ httpput.setEntity(new InputStreamEntity(digestInputStream, file.length()));
+ CloseableHttpResponse putResponse = httpclient.execute(httpput);
+ try {
+ int putStatus = putResponse.getStatusLine().getStatusCode();
+ String putRes = null;
+ HttpEntity putEntity = putResponse.getEntity();
+ if (putEntity != null) {
+ putRes = EntityUtils.toString(putEntity);
+ }
+ if (putStatus == 200) {
+ String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest());
+ dataId = registerFileWithDataverse(file, path, storageIdentifier, localchecksum, retries);
+ if(dataId!=null) {
+ retries=0;
+ } else {
+ println("Failure registering " + file.getName() + " with Dataverse");
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ println("Error processing POST to Dataverse" + file.getAbsolutePath() + " : " + e.getMessage());
+ retries = 0;
+ }
+ }
+
+ } catch (NoSuchAlgorithmException nsae) {
+ println("MD5 algorithm not found: " + nsae.getMessage());
+ }
+ } else {
+
+ String abortUrl = uploadResponse.getString("abort");
+ String completeUrl = uploadResponse.getString("complete");
+ JSONObject uploadUrls = uploadResponse.getJSONObject("urls");
+
+ //And we're sent the storageIdentifier - not strictly needed since the storageIdentifier is already in all the URLS where it's needed.
+ String storageIdentifier = uploadResponse.getString("storageIdentifier");
+
+ //Queue up all parts so that each part can be uploaded via a different thread.
+ //Using httpConcurrency number of threads to match the number of open HTTP calls to the S3 server that we're allowed in the HttpClient pool
+ ExecutorService executor = Executors.newFixedThreadPool(httpConcurrency);
+
+ //Give the HttpPartUploadJob class the common info it needs to send parts to S3
+ HttpPartUploadJob.setHttpClient(getSharedHttpClient());
+ HttpPartUploadJob.setHttpClientContext(getLocalContext());
+ HttpPartUploadJob.setPartSize(maxPartSize);
+
+ //Create a map to store the eTags from the parts and the md5 calculated for the whole file
+ Map mpUploadInfoMap = new HashMap(uploadUrls.length() + 1);
+ //Setup a job to calculate the md5 hash of the file
+ //Probably helpful to have it run in parallel, but it could be a pre or post step as well. If the network is fast relative to disk, we may want the executor to use one extra thread for this
+ MD5Job mjob = new MD5Job(file, mpUploadInfoMap);
+ executor.execute(mjob);
+
+ //Now set up upload jobs for each part
+ int i = 1;
+ long remainingSize = file.length();
+ while (uploadUrls.has(Integer.toString(i))) {
+ //Calculate part size
+ long partSize = maxPartSize;
+ if (remainingSize < maxPartSize) {
+ partSize = remainingSize;
+ }
+ remainingSize -= partSize;
+ println("Creating job for " + partSize + " bytes");
+ HttpPartUploadJob uj = new HttpPartUploadJob(i, uploadUrls.getString(Integer.toString(i)), file, partSize, mpUploadInfoMap);
+
+ executor.execute(uj);
+ i++;
+ }
+ float total = (float) (i - 1);
+ println("All " + total + " parts for: " + storageIdentifier + " queued.");
+ //Tell the executor that there are no more jobs coming
+ executor.shutdown();
+ //And wait until it finishes the ones that are running
+ try {
+ while (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ printStatus(mpUploadInfoMap.size() / total);
+ }
+ } catch (InterruptedException e) {
+ println("Upload of " + storageIdentifier + " interrupted: " + e.getMessage());
+ //Call abort?
+ }
+ boolean fileUploadComplete = true;
+
+ for (String part : (Set) uploadUrls.keySet()) {
+ if (!mpUploadInfoMap.containsKey(part)) {
+ fileUploadComplete = false;
+ break;
+ }
+ }
+ //Technically, the uploads to S3 could succeed and only the md5 fails, but in this case we still want to abort the MP Upload, not complete it.
+ if (!mpUploadInfoMap.containsKey("md5")) {
+ fileUploadComplete = false;
+ }
+ if (fileUploadComplete) {
+ println("Part uploads Completed for " + storageIdentifier);
+ HttpPut completeUpload = new HttpPut(server + completeUrl + "&key=" + apiKey);
+ JSONObject eTags = new JSONObject();
+ for (String partNo : (Set) mpUploadInfoMap.keySet()) {
+ if (!partNo.equals("md5")) {
+ eTags.put(partNo, mpUploadInfoMap.get(partNo));
+ }
+ }
+ StringEntity body = new StringEntity(eTags.toString());
+ println("ETags: " + eTags.toString());
+ completeUpload.setEntity(body);
+ completeUpload.setHeader("Content-type", "application/json");
+
+ response = httpclient.execute(completeUpload, getLocalContext());
+ EntityUtils.consumeQuietly(response.getEntity());
+ status = response.getStatusLine().getStatusCode();
+ if (status == 200) {
+ println("Successful upload of " + file.getAbsolutePath());
+ dataId = registerFileWithDataverse(file, path, storageIdentifier, mpUploadInfoMap.get("md5"), retries);
+ } else {
+ println("Partial upload of " + file.getAbsolutePath() + ", complete upload failed with status: " + status);
+ }
+
+ retries = 0;
+ } else {
+ HttpDelete delete = new HttpDelete(server + abortUrl + "&key=" + apiKey);
+ response = httpclient.execute(delete, getLocalContext());
+ EntityUtils.consumeQuietly(response.getEntity());
+ status = response.getStatusLine().getStatusCode();
+ if (status != 204) {
+ println("Call to " + abortUrl + " failed with status: " + status);
+ retries = 0;
+ } else {
+ println("Upload of " + file.getAbsolutePath() + " failed and upload request successfully aborted.");
+ println("Upload of large files is not automatically retried - run again to retry this file upload.");
+ retries = 0;
+ }
+ }
+ }
+ } else {
+ println("Retrying: return status was : " + status);
+ retries--;
+ }
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ println("Error processing file upload " + file.getAbsolutePath() + " : " + e.getMessage());
+ retries = 0;
+ }
+ }
+ return dataId;
+ }
+
+ private String registerFileWithDataverse(Resource file, String path, String storageIdentifier, String checksum, int retries) {
+ String dataId = null;
+ // Now post data
+ String urlString = server + "/api/datasets/:persistentId/add";
+ urlString = urlString + "?persistentId=" + datasetPID + "&key=" + apiKey;
+ while (retries > 0) {
+ HttpPost httppost = new HttpPost(urlString);
+
+ // ContentBody bin = file.getContentBody();
+ MultipartEntityBuilder meb = MultipartEntityBuilder.create();
+ // meb.addPart("file", bin);
+ String jsonData = "{\"storageIdentifier\":\"" + storageIdentifier + "\",\"fileName\":\""
+ + file.getName() + "\",\"mimeType\":\"" + file.getMimeType() + "\",\"md5Hash\":\"" + checksum + "\",\"fileSize\":\"" + file.length() + "\"";
+ if (recurse) {
+ // Dataverse takes paths without an initial / and ending without a /
+ // with the path not including the file name
+ if (path.substring(1).contains("/")) {
+ String parentPath = path.substring(1, path.lastIndexOf("/"));
+ jsonData = jsonData
+ + (!parentPath.isEmpty() ? ",\"directoryLabel\":\"" + parentPath + "\"}"
+ : "}");
+
+ }
+ } else {
+ jsonData = jsonData + "}";
+ }
+ meb.addTextBody("jsonData", jsonData);
+
+ HttpEntity reqEntity = meb.build();
+ httppost.setEntity(reqEntity);
+ try {
+ CloseableHttpResponse postResponse = httpclient.execute(httppost, getLocalContext());
+
+ int postStatus = postResponse.getStatusLine().getStatusCode();
+ String postRes = null;
+ HttpEntity postEntity = postResponse.getEntity();
+ if (postEntity != null) {
+ postRes = EntityUtils.toString(postEntity);
+ }
+ if (postStatus == 200) {
+ JSONObject checksumObject = (new JSONObject(postRes)).getJSONObject("data")
+ .getJSONArray("files").getJSONObject(0).getJSONObject("dataFile")
+ .getJSONObject("checksum");
+ dataId = checksumObject.getString("type") + ":" + checksumObject.getString("value");
+ retries = 0;
+ int total = 0;
+ // For new servers, wait up to maxWaitTime for a dataset lock to expire.
+ while (isLocked() && (total < maxWaitTime)) {
+ TimeUnit.SECONDS.sleep(1);
+ total = total + 1;
+ }
+ } else if (postStatus == 400 && oldServer) {
+ // If the call to the lock API fails in isLocked(), oldServer will be set to
+ // true and
+ // all we can do for a lock is to keep retrying.
+ // Unfortunately, the error messages are configurable, so there's no guaranteed
+ // way to detect
+ // locks versus other conditions (e.g. file exists), so we can test for unique
+ // words in the default messages
+ if ((postRes != null) && postRes.contains("lock")) {
+ retries--;
+ } else {
+ println("Error response when processing " + file.getAbsolutePath() + " : "
+ + postResponse.getStatusLine().getReasonPhrase());
+ // A real error: e.g. This file already exists in the dataset.
+ if (postRes != null) {
+ println(postRes);
+ }
+ // Skip
+ retries = 0;
+ }
+ } else {
+ // An error and unlikely that we can recover, so report and move on.
+ println("Error response when processing " + file.getAbsolutePath() + " : "
+ + postResponse.getStatusLine().getReasonPhrase());
+ if (postRes != null) {
+ println(postRes);
+ }
+ retries = 0;
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ retries = 0;
+ } catch (IOException ex) {
+ retries = 0;
+ println("Error registering file with dataverse: " + storageIdentifier + " : " + ex.getMessage());
+ }
+ }
+ return dataId;
+ }
+
+ private void initHttpPool() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException {
+ if (trustCerts) {
+ // use the TrustSelfSignedStrategy to allow Self Signed Certificates
+ SSLContext sslContext;
+ SSLConnectionSocketFactory connectionFactory;
+
+ sslContext = SSLContextBuilder
+ .create()
+ .loadTrustMaterial(new TrustAllStrategy())
+ .build();
+ // create an SSL Socket Factory to use the SSLContext with the trust self signed certificate strategy
+ // and allow all hosts verifier.
+ connectionFactory = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);
+
+ Registry registry = RegistryBuilder.create()
+ .register("https", connectionFactory).build();
+ cm = new PoolingHttpClientConnectionManager(registry);
+ } else {
+ cm = new PoolingHttpClientConnectionManager();
+ }
+ cm.setDefaultMaxPerRoute(httpConcurrency);
+ cm.setMaxTotal(httpConcurrency > 20 ? httpConcurrency : 20);
+ }
}
diff --git a/src/main/java/org/sead/uploader/dataverse/HttpPartUploadJob.java b/src/main/java/org/sead/uploader/dataverse/HttpPartUploadJob.java
new file mode 100644
index 0000000..6396a26
--- /dev/null
+++ b/src/main/java/org/sead/uploader/dataverse/HttpPartUploadJob.java
@@ -0,0 +1,107 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.sead.uploader.dataverse;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import static org.sead.uploader.AbstractUploader.println;
+import org.sead.uploader.util.Resource;
+
+/**
+ *
+ * @author Jim
+ */
+public class HttpPartUploadJob implements Runnable {
+
+ int partNo;
+ long size;
+ String signedUrl;
+ Resource file;
+ Map eTags;
+
+ static private long partSize = -1;
+ static private CloseableHttpClient httpClient = null;
+ static private HttpClientContext localContext = null;
+
+ public static void setHttpClient(CloseableHttpClient client) {
+ httpClient = client;
+ }
+
+ public static void setHttpClientContext(HttpClientContext context) {
+ localContext = context;
+ }
+
+ public static void setPartSize(long ps) {
+ partSize=ps;
+ }
+
+ public HttpPartUploadJob(int partNo, String url, Resource file, long size, Map eTags) throws IllegalStateException {
+ if ((size == -1) || (httpClient == null) || (localContext == null)) {
+ throw new IllegalStateException("partSize not set");
+ }
+ this.partNo = partNo;
+ this.signedUrl = url;
+ this.file = file;
+ this.size=size;
+ this.eTags = eTags;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ int retries = 3;
+ //println("Starting upload of part: " + partNo);
+ while (retries > 0) {
+ try (InputStream is = file.getInputStream((partNo - 1) * partSize, size)) {
+
+ HttpPut httpput = new HttpPut(signedUrl);
+ httpput.setEntity(new InputStreamEntity(is, size));
+ CloseableHttpResponse putResponse = httpClient.execute(httpput);
+ int putStatus = putResponse.getStatusLine().getStatusCode();
+ String putRes = null;
+ HttpEntity putEntity = putResponse.getEntity();
+ if (putEntity != null) {
+ putRes = EntityUtils.toString(putEntity);
+ }
+ if (putStatus == 200) {
+ //Part successfully stored - parse the eTag from the response and it it to the Map
+ String eTag = putResponse.getFirstHeader("ETag").getValue();
+ eTag= eTag.replace("\"","");
+ eTags.put(Integer.toString(partNo), eTag);
+ retries = 0;
+ //println("Completed upload of part: " + partNo);
+ } else {
+ if (putStatus >= 500) {
+ println("Upload of part: " + partNo + " failed with status: " + putStatus + " (skipping)");
+ println("Error response: " + putResponse.getStatusLine() + " : " + putRes);
+ retries--;
+ } else {
+ println("Upload of part: " + partNo + " failed with status: " + putStatus + " (retrying)");
+ println("Error response: " + putResponse.getStatusLine() + " : " + putRes);
+
+ retries--;
+ }
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ println("Error uploading part: " + partNo + " : " + e.getMessage());
+ retries = 0;
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/sead/uploader/dataverse/MD5Job.java b/src/main/java/org/sead/uploader/dataverse/MD5Job.java
new file mode 100644
index 0000000..bd0b42a
--- /dev/null
+++ b/src/main/java/org/sead/uploader/dataverse/MD5Job.java
@@ -0,0 +1,62 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.sead.uploader.dataverse;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import static org.sead.uploader.AbstractUploader.println;
+import org.sead.uploader.util.Resource;
+
+/**
+ *
+ * @author Jim
+ */
+public class MD5Job implements Runnable {
+
+ Resource file;
+ Map infoMap;
+
+ public MD5Job(Resource file, Map infoMap) throws IllegalStateException {
+ this.file = file;
+ this.infoMap = infoMap;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ try {
+ MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+
+ try (InputStream inStream = file.getInputStream(); DigestInputStream digestInputStream = new DigestInputStream(inStream, messageDigest)) {
+ byte[] bytes = new byte[64*1024];
+ while(digestInputStream.read(bytes) >= 0) {
+ }
+ String checksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest());
+ infoMap.put("md5", checksum);
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ println("Error calculating digest for: " + file.getAbsolutePath() + " : " + e.getMessage());
+ }
+ } catch (NoSuchAlgorithmException nsae) {
+ println("MD5 algorithm not found: " + nsae.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/org/sead/uploader/util/FileResource.java b/src/main/java/org/sead/uploader/util/FileResource.java
index 41c0b5b..7006e06 100644
--- a/src/main/java/org/sead/uploader/util/FileResource.java
+++ b/src/main/java/org/sead/uploader/util/FileResource.java
@@ -31,12 +31,13 @@
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.mime.content.ContentBody;
import org.apache.http.entity.mime.content.FileBody;
import org.json.JSONObject;
-public class FileResource implements Resource {
+public class FileResource extends Resource {
private File f;
@@ -163,5 +164,4 @@ public JSONObject getMetadata() {
// No extra metadata by default for files
return new JSONObject();
}
-
}
diff --git a/src/main/java/org/sead/uploader/util/PublishedFolderProxyResource.java b/src/main/java/org/sead/uploader/util/PublishedFolderProxyResource.java
index 3d09de3..d4b5977 100644
--- a/src/main/java/org/sead/uploader/util/PublishedFolderProxyResource.java
+++ b/src/main/java/org/sead/uploader/util/PublishedFolderProxyResource.java
@@ -40,7 +40,7 @@
* @author Jim
*
*/
-public class PublishedFolderProxyResource extends PublishedResource implements Resource {
+public class PublishedFolderProxyResource extends PublishedResource {
private PublishedResource resource;
private String message;
diff --git a/src/main/java/org/sead/uploader/util/PublishedResource.java b/src/main/java/org/sead/uploader/util/PublishedResource.java
index 34b24c3..b552f44 100644
--- a/src/main/java/org/sead/uploader/util/PublishedResource.java
+++ b/src/main/java/org/sead/uploader/util/PublishedResource.java
@@ -33,7 +33,7 @@
import org.json.JSONException;
import org.json.JSONObject;
-public class PublishedResource implements Resource {
+public class PublishedResource extends Resource {
protected JSONObject resource;
private String path;
diff --git a/src/main/java/org/sead/uploader/util/Resource.java b/src/main/java/org/sead/uploader/util/Resource.java
index ce75f93..74ab1e3 100644
--- a/src/main/java/org/sead/uploader/util/Resource.java
+++ b/src/main/java/org/sead/uploader/util/Resource.java
@@ -15,33 +15,55 @@
***************************************************************************** */
package org.sead.uploader.util;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.InputStream;
+import org.apache.commons.io.input.BoundedInputStream;
import org.apache.http.entity.mime.content.ContentBody;
import org.json.JSONObject;
-public interface Resource extends Iterable {
+public abstract class Resource implements Iterable {
- String getName();
+ public abstract String getName();
- boolean isDirectory();
+ public abstract boolean isDirectory();
- String getPath();
+ public abstract String getPath();
- long length();
+ public abstract long length();
- String getAbsolutePath();
+ public abstract String getAbsolutePath();
- ContentBody getContentBody();
+ public abstract ContentBody getContentBody();
- InputStream getInputStream();
+ public abstract InputStream getInputStream();
- Iterable listResources();
+ public abstract Iterable listResources();
- String getHash(String algorithm);
+ public abstract String getHash(String algorithm);
- JSONObject getMetadata();
+ public abstract JSONObject getMetadata();
- String getMimeType();
+ public abstract String getMimeType();
+
+ /**
+ *
+ * @param l
+ * @param partSize
+ * @return
+ */
+ public InputStream getInputStream(long l, long partSize) {
+ try {
+ InputStream is = getInputStream();
+ is.skip(l);
+ return new BoundedInputStream(is, partSize);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
}