From cd7eb9fb9077e224bdd791efeb4433c14c9f0154 Mon Sep 17 00:00:00 2001 From: Jim Myers Date: Thu, 9 Jul 2020 17:16:06 -0400 Subject: [PATCH] Support for multipart uploads --- pom.xml | 2 +- .../org/sead/uploader/AbstractUploader.java | 8 + .../sead/uploader/dataverse/DVUploader.java | 629 ++++++++++++------ .../uploader/dataverse/HttpPartUploadJob.java | 107 +++ .../org/sead/uploader/dataverse/MD5Job.java | 62 ++ .../org/sead/uploader/util/FileResource.java | 4 +- .../util/PublishedFolderProxyResource.java | 2 +- .../sead/uploader/util/PublishedResource.java | 2 +- .../java/org/sead/uploader/util/Resource.java | 46 +- 9 files changed, 648 insertions(+), 214 deletions(-) create mode 100644 src/main/java/org/sead/uploader/dataverse/HttpPartUploadJob.java create mode 100644 src/main/java/org/sead/uploader/dataverse/MD5Job.java diff --git a/pom.xml b/pom.xml index d8705e6..6866d77 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 DVUploader DVUploader - 1.0.9 + 1.1.0 UTF-8 diff --git a/src/main/java/org/sead/uploader/AbstractUploader.java b/src/main/java/org/sead/uploader/AbstractUploader.java index 176b6cd..593ab60 100644 --- a/src/main/java/org/sead/uploader/AbstractUploader.java +++ b/src/main/java/org/sead/uploader/AbstractUploader.java @@ -22,6 +22,7 @@ import java.io.PrintWriter; import java.net.MalformedURLException; import java.net.URL; +import java.text.DecimalFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -104,6 +105,13 @@ public static void println(String s) { } return; } + static DecimalFormat decimalFormat = new DecimalFormat("#.00"); + + public static void printStatus(float s) { + System.out.print("\rProgress: " + decimalFormat.format(s*100) + "%"); + System.out.flush(); + return; + } public void parseArgs(String[] args) { diff --git a/src/main/java/org/sead/uploader/dataverse/DVUploader.java b/src/main/java/org/sead/uploader/dataverse/DVUploader.java index 02b77af..201e6c4 100644 --- a/src/main/java/org/sead/uploader/dataverse/DVUploader.java +++ b/src/main/java/org/sead/uploader/dataverse/DVUploader.java @@ -23,6 +23,10 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,30 +35,34 @@ import org.apache.commons.codec.binary.Hex; import org.apache.http.HttpEntity; -import org.apache.http.HttpHeaders; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.config.CookieSpecs; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.client.methods.RequestBuilder; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustAllStrategy; import org.apache.http.entity.InputStreamEntity; +import org.apache.http.entity.StringEntity; import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.entity.mime.content.ContentBody; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; import org.json.JSONArray; import org.json.JSONObject; import org.sead.uploader.AbstractUploader; +import static org.sead.uploader.AbstractUploader.println; import org.sead.uploader.util.UploaderException; import org.sead.uploader.util.Resource; @@ -73,6 +81,22 @@ public class DVUploader extends AbstractUploader { private static boolean directUpload = false; private static boolean trustCerts = false; + private int timeout = 1200; + private int httpConcurrency = 4; + + private static long mpSizeLimit = 5 * 1024 * 1024; + private RequestConfig config = RequestConfig.custom() + .setConnectTimeout(timeout * 1000) + .setConnectionRequestTimeout(timeout * 1000) + .setSocketTimeout(timeout * 1000) + .setCookieSpec(CookieSpecs.STANDARD) + .setExpectContinueEnabled(true) + .build(); + + private static HttpClientContext localContext = HttpClientContext.create(); + + private PoolingHttpClientConnectionManager cm = null; + public static void main(String args[]) throws Exception { setUploader(new DVUploader()); @@ -100,7 +124,7 @@ public static void main(String args[]) throws Exception { private static void usage() { println("\nUsage:"); - println(" java -jar DVUploader-1.0.1.jar -server= -key= -did= "); + println(" java -jar DVUploader-1.1.0.jar -server= -key= -did= "); println("\n where:"); println(" = the URL of the server to upload to, e.g. https://datverse.tdl.org"); @@ -109,6 +133,7 @@ private static void usage() { println(" = a space separated list of files to upload or directory name(s) where the files to upload are"); println("\n Optional Arguments:"); println(" -directupload - Use Dataverse's direct upload capability to send files directly to their final location (only works if this is enabled on the server)"); + println(" -mpsize - with direct upload, this is the max size before switching to multipart uploads (Note actual part size is server controlled)"); println(" -listonly - Scan the Dataset and local files and list what would be uploaded (does not upload with this flag)"); println(" -limit= - Specify a maximum number of files to upload per invocation."); println(" -verify - Check both the file name and checksum in comparing with current Dataset entries."); @@ -139,6 +164,14 @@ public boolean parseCustomArg(String arg) { directUpload = true; println("Will use direct upload of files (if configured on this server)"); return true; + } else if (arg.startsWith("-mpsize")) { + try { + mpSizeLimit = Long.parseLong(arg.substring(arg.indexOf(argSeparator) + 1)); + } catch (NumberFormatException nfe) { + println("Unable to parse -mpsize as long, using : " + mpSizeLimit); + } + println("Will use multipart upload for direct upload files over " + mpSizeLimit + " bytes"); + return true; } else if (arg.equals("-trustall")) { trustCerts = true; println("Will trust all certificates"); @@ -162,31 +195,12 @@ public HttpClientContext authenticate() { public CloseableHttpClient getSharedHttpClient() { if (httpclient == null) { - // use the TrustSelfSignedStrategy to allow Self Signed Certificates - SSLContext sslContext; - SSLConnectionSocketFactory connectionFactory; try { - if (trustCerts) { - sslContext = SSLContextBuilder - .create() - .loadTrustMaterial(new TrustAllStrategy()) - .build(); - // create an SSL Socket Factory to use the SSLContext with the trust self signed certificate strategy - // and allow all hosts verifier. - connectionFactory = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE); - // finally create the HttpClient using HttpClient factory methods and assign the ssl socket factory - httpclient = HttpClients - .custom() - .setSSLSocketFactory(connectionFactory) - .setDefaultRequestConfig(RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).setExpectContinueEnabled(true).build()) - .build(); - } else { - httpclient = HttpClients - .custom() - .setDefaultRequestConfig(RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).setExpectContinueEnabled(true).build()) - .build(); - - } + initHttpPool(); + httpclient = HttpClients.custom() + .setConnectionManager(cm) + .setDefaultRequestConfig(config) + .build(); } catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException ex) { Logger.getLogger(DVUploader.class.getName()).log(Level.SEVERE, null, ex); @@ -388,172 +402,18 @@ protected String uploadDatafile(Resource file, String path) { httpclient = getSharedHttpClient(); } String dataId = null; - int retry = 10; + int retries = 10; if (directUpload) { - while (retry > 0) { - - try { - - // Now post data - String urlString = server + "/api/datasets/:persistentId/uploadsid"; - urlString = urlString + "?persistentId=doi:" + datasetPID.substring(4) + "&key=" + apiKey; - HttpGet httpget = new HttpGet(urlString); - CloseableHttpResponse response = httpclient.execute(httpget, getLocalContext()); - try { - int status = response.getStatusLine().getStatusCode(); - String uploadUrl = null; - String jsonResponse = null; - HttpEntity resEntity = response.getEntity(); - if (resEntity != null) { - jsonResponse = EntityUtils.toString(resEntity); - } - if (status == 200) { - JSONObject data = (new JSONObject(jsonResponse)).getJSONObject("data"); - uploadUrl = data.getString("url"); - String storageIdentifier = data.getString("storageIdentifier"); - - HttpPut httpput = new HttpPut(uploadUrl); - - - httpput.addHeader("x-amz-tagging", "dv-state=temp"); - MessageDigest messageDigest = MessageDigest.getInstance("MD5"); - - try (InputStream inStream = file.getInputStream(); DigestInputStream digestInputStream = new DigestInputStream(inStream, messageDigest)) { - // This is hte new form for requests - keeping the example but won't update until we can change all - //HttpUriRequest httpput = RequestBuilder.put() - // .setUri(uploadUrl) - // .setHeader("x-amz-tagging", "dv-state=temp") - // .setEntity(new InputStreamEntity(digestInputStream, file.length())) - // .build(); - httpput.setEntity(new InputStreamEntity(digestInputStream, file.length())); - CloseableHttpResponse putResponse = httpclient.execute(httpput); - try { - int putStatus = putResponse.getStatusLine().getStatusCode(); - String putRes = null; - HttpEntity putEntity = putResponse.getEntity(); - if (putEntity != null) { - putRes = EntityUtils.toString(putEntity); - } - if (putStatus == 200) { - String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); - // Now post data - urlString = server + "/api/datasets/:persistentId/add"; - urlString = urlString + "?persistentId=" + datasetPID + "&key=" + apiKey; - HttpPost httppost = new HttpPost(urlString); - - // ContentBody bin = file.getContentBody(); - MultipartEntityBuilder meb = MultipartEntityBuilder.create(); - // meb.addPart("file", bin); - String jsonData = "{\"storageIdentifier\":\"" + storageIdentifier + "\",\"fileName\":\"" - + file.getName() + "\",\"mimeType\":\"" + file.getMimeType() + "\",\"md5Hash\":\"" + localchecksum + "\",\"fileSize\":\"" + file.length() + "\""; - if (recurse) { - // Dataverse takes paths without an initial / and ending without a / - // with the path not including the file name - if (path.substring(1).contains("/")) { - String parentPath = path.substring(1, path.lastIndexOf("/")); - jsonData = jsonData - + (!parentPath.isEmpty() ? ",\"directoryLabel\":\"" + parentPath + "\"}" - : "}"); - - } - } else { - jsonData = jsonData + "}"; - } - meb.addTextBody("jsonData", jsonData); - - HttpEntity reqEntity = meb.build(); - httppost.setEntity(reqEntity); - - CloseableHttpResponse postResponse = httpclient.execute(httppost, getLocalContext()); - try { - int postStatus = postResponse.getStatusLine().getStatusCode(); - String postRes = null; - HttpEntity postEntity = postResponse.getEntity(); - if (postEntity != null) { - postRes = EntityUtils.toString(postEntity); - } - if (postStatus == 200) { - JSONObject checksum = (new JSONObject(postRes)).getJSONObject("data") - .getJSONArray("files").getJSONObject(0).getJSONObject("dataFile") - .getJSONObject("checksum"); - dataId = checksum.getString("type") + ":" + checksum.getString("value"); - retry = 0; - int total = 0; - // For new servers, wait up to maxWaitTime for a dataset lock to expire. - while (isLocked() && (total < maxWaitTime)) { - TimeUnit.SECONDS.sleep(1); - total = total + 1; - } - } else if (status == 400 && oldServer) { - // If the call to the lock API fails in isLocked(), oldServer will be set to - // true and - // all we can do for a lock is to keep retrying. - // Unfortunately, the error messages are configurable, so there's no guaranteed - // way to detect - // locks versus other conditions (e.g. file exists), so we can test for unique - // words in the default messages - if ((postRes != null) && postRes.contains("lock")) { - retry--; - } else { - println("Error response when processing " + file.getAbsolutePath() + " : " - + response.getStatusLine().getReasonPhrase()); - // A real error: e.g. This file already exists in the dataset. - if (postRes != null) { - println(postRes); - } - // Skip - retry = 0; - } - } else { - // An error and unlikely that we can recover, so report and move on. - println("Error response when processing " + file.getAbsolutePath() + " : " - + response.getStatusLine().getReasonPhrase()); - if (postRes != null) { - println(postRes); - } - retry = 0; - } - } catch (InterruptedException e) { - e.printStackTrace(); - retry = 0; - } finally { - EntityUtils.consumeQuietly(response.getEntity()); - } - } else { - println("Upload failed with status: " + putStatus + " (skipping)"); - retry=0; - } - - } catch (IOException e) { - e.printStackTrace(System.out); - println("Error processing POST to Dataverse" + file.getAbsolutePath() + " : " + e.getMessage()); - retry = 0; - } - } - } else { - if (status > 500) { - retry--; - } else { - retry = 0; - } - } - - } catch (IOException e) { - e.printStackTrace(System.out); - println("Error processing file upload " + file.getAbsolutePath() + " : " + e.getMessage()); - retry = 0; - } catch (NoSuchAlgorithmException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - - } catch (IOException e) { - println("Error processing request for storage id" + file.getAbsolutePath() + " : " + e.getMessage()); - retry = 0; - } + try { +//@Deprecated -used in v4.20 dataId = directFileUpload(file, path, retries); + dataId = multipartDirectFileUpload(file, path, retries); + } catch (IOException e) { + println("Error processing request for storage id" + file.getAbsolutePath() + " : " + e.getMessage()); + retries = 0; } + } else { - while (retry > 0) { + while (retries > 0) { try { // Now post data @@ -589,7 +449,7 @@ protected String uploadDatafile(Resource file, String path) { JSONObject checksum = (new JSONObject(res)).getJSONObject("data").getJSONArray("files") .getJSONObject(0).getJSONObject("dataFile").getJSONObject("checksum"); dataId = checksum.getString("type") + ":" + checksum.getString("value"); - retry = 0; + retries = 0; int total = 0; // For new servers, wait up to maxWaitTime for a dataset lock to expire. while (isLocked() && (total < maxWaitTime)) { @@ -605,7 +465,7 @@ protected String uploadDatafile(Resource file, String path) { // locks versus other conditions (e.g. file exists), so we can test for unique // words in the default messages if ((res != null) && res.contains("lock")) { - retry--; + retries--; } else { println("Error response when processing " + file.getAbsolutePath() + " : " + response.getStatusLine().getReasonPhrase()); @@ -614,7 +474,7 @@ protected String uploadDatafile(Resource file, String path) { println(res); } // Skip - retry = 0; + retries = 0; } } else { // An error and unlikely that we can recover, so report and move on. @@ -623,18 +483,18 @@ protected String uploadDatafile(Resource file, String path) { if (res != null) { println(res); } - retry = 0; + retries = 0; } } catch (InterruptedException e) { e.printStackTrace(); - retry = 0; + retries = 0; } finally { EntityUtils.consumeQuietly(response.getEntity()); } } catch (IOException e) { println("Error processing " + file.getAbsolutePath() + " : " + e.getMessage()); - retry = 0; + retries = 0; } } } @@ -678,4 +538,379 @@ private boolean isLocked() { } return false; } + + private String directFileUpload(Resource file, String path, int retries) throws IOException { + String dataId = null; + while (retries > 0) { + // Now post data + String urlString = server + "/api/datasets/:persistentId/uploadsid"; + urlString = urlString + "?persistentId=doi:" + datasetPID.substring(4) + "&key=" + apiKey; + HttpGet httpget = new HttpGet(urlString); + CloseableHttpResponse response = httpclient.execute(httpget, getLocalContext()); + try { + int status = response.getStatusLine().getStatusCode(); + String uploadUrl = null; + String jsonResponse = null; + HttpEntity resEntity = response.getEntity(); + if (resEntity != null) { + jsonResponse = EntityUtils.toString(resEntity); + } + if (status == 200) { + JSONObject data = (new JSONObject(jsonResponse)).getJSONObject("data"); + uploadUrl = data.getString("url"); + String storageIdentifier = data.getString("storageIdentifier"); + + HttpPut httpput = new HttpPut(uploadUrl); + + httpput.addHeader("x-amz-tagging", "dv-state=temp"); + MessageDigest messageDigest = MessageDigest.getInstance("MD5"); + + try (InputStream inStream = file.getInputStream(); DigestInputStream digestInputStream = new DigestInputStream(inStream, messageDigest)) { + // This is hte new form for requests - keeping the example but won't update until we can change all + //HttpUriRequest httpput = RequestBuilder.put() + // .setUri(uploadUrl) + // .setHeader("x-amz-tagging", "dv-state=temp") + // .setEntity(new InputStreamEntity(digestInputStream, file.length())) + // .build(); + httpput.setEntity(new InputStreamEntity(digestInputStream, file.length())); + CloseableHttpResponse putResponse = httpclient.execute(httpput); + try { + int putStatus = putResponse.getStatusLine().getStatusCode(); + String putRes = null; + HttpEntity putEntity = putResponse.getEntity(); + if (putEntity != null) { + putRes = EntityUtils.toString(putEntity); + } + if (putStatus == 200) { + String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); + dataId = registerFileWithDataverse(file, path, storageIdentifier, localchecksum, retries); + retries=0; + } + } catch (IOException e) { + e.printStackTrace(System.out); + println("Error processing POST to Dataverse" + file.getAbsolutePath() + " : " + e.getMessage()); + retries = 0; + } + } + } else { + if (status > 500) { + retries--; + } else { + retries = 0; + } + } + + } catch (IOException e) { + e.printStackTrace(System.out); + println("Error processing file upload " + file.getAbsolutePath() + " : " + e.getMessage()); + retries = 0; + } catch (NoSuchAlgorithmException e1) { + println("Checksum Algoritm not found: " + e1.getLocalizedMessage()); + } + } + return dataId; + } + + private String multipartDirectFileUpload(Resource file, String path, int retries) throws IOException { + String dataId = null; + while (retries > 0) { + // Start multipart upload with a call to Dataverse. It will make a call to S3 to start the multipart upload and will return a set of presigned Urls for us to upload the parts + String urlString = server + "/api/datasets/:persistentId/uploadurls"; + urlString = urlString + "?persistentId=doi:" + datasetPID.substring(4) + "&key=" + apiKey + "&size=" + file.length(); + HttpGet httpget = new HttpGet(urlString); + CloseableHttpResponse response = httpclient.execute(httpget, getLocalContext()); + try { + int status = response.getStatusLine().getStatusCode(); + + String jsonResponse = null; + HttpEntity resEntity = response.getEntity(); + if (resEntity != null) { + jsonResponse = EntityUtils.toString(resEntity); + } + if (status == 200) { + println(jsonResponse); + JSONObject uploadResponse = (new JSONObject(jsonResponse)).getJSONObject("data"); + //Along with the parts, which should be listed numerically, we get convenience URLs to call on Dataverse to abort or complete the multipart upload + //backwards compat in testing + long maxPartSize = 5 * 1024 * 1024l; + if (uploadResponse.has("partSize")) { + maxPartSize = uploadResponse.getLong("partSize"); + } + if (uploadResponse.has("url")) { + String storageIdentifier = uploadResponse.getString("storageIdentifier"); + String uploadUrl = uploadResponse.getString("url"); + HttpPut httpput = new HttpPut(uploadUrl); + + httpput.addHeader("x-amz-tagging", "dv-state=temp"); + try { + MessageDigest messageDigest = MessageDigest.getInstance("MD5"); + + try (InputStream inStream = file.getInputStream(); DigestInputStream digestInputStream = new DigestInputStream(inStream, messageDigest)) { + // This is hte new form for requests - keeping the example but won't update until we can change all + //HttpUriRequest httpput = RequestBuilder.put() + // .setUri(uploadUrl) + // .setHeader("x-amz-tagging", "dv-state=temp") + // .setEntity(new InputStreamEntity(digestInputStream, file.length())) + // .build(); + httpput.setEntity(new InputStreamEntity(digestInputStream, file.length())); + CloseableHttpResponse putResponse = httpclient.execute(httpput); + try { + int putStatus = putResponse.getStatusLine().getStatusCode(); + String putRes = null; + HttpEntity putEntity = putResponse.getEntity(); + if (putEntity != null) { + putRes = EntityUtils.toString(putEntity); + } + if (putStatus == 200) { + String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); + dataId = registerFileWithDataverse(file, path, storageIdentifier, localchecksum, retries); + if(dataId!=null) { + retries=0; + } else { + println("Failure registering " + file.getName() + " with Dataverse"); + } + } + } catch (IOException e) { + e.printStackTrace(System.out); + println("Error processing POST to Dataverse" + file.getAbsolutePath() + " : " + e.getMessage()); + retries = 0; + } + } + + } catch (NoSuchAlgorithmException nsae) { + println("MD5 algorithm not found: " + nsae.getMessage()); + } + } else { + + String abortUrl = uploadResponse.getString("abort"); + String completeUrl = uploadResponse.getString("complete"); + JSONObject uploadUrls = uploadResponse.getJSONObject("urls"); + + //And we're sent the storageIdentifier - not strictly needed since the storageIdentifier is already in all the URLS where it's needed. + String storageIdentifier = uploadResponse.getString("storageIdentifier"); + + //Queue up all parts so that each part can be uploaded via a different thread. + //Using httpConcurrency number of threads to match the number of open HTTP calls to the S3 server that we're allowed in the HttpClient pool + ExecutorService executor = Executors.newFixedThreadPool(httpConcurrency); + + //Give the HttpPartUploadJob class the common info it needs to send parts to S3 + HttpPartUploadJob.setHttpClient(getSharedHttpClient()); + HttpPartUploadJob.setHttpClientContext(getLocalContext()); + HttpPartUploadJob.setPartSize(maxPartSize); + + //Create a map to store the eTags from the parts and the md5 calculated for the whole file + Map mpUploadInfoMap = new HashMap(uploadUrls.length() + 1); + //Setup a job to calculate the md5 hash of the file + //Probably helpful to have it run in parallel, but it could be a pre or post step as well. If the network is fast relative to disk, we may want the executor to use one extra thread for this + MD5Job mjob = new MD5Job(file, mpUploadInfoMap); + executor.execute(mjob); + + //Now set up upload jobs for each part + int i = 1; + long remainingSize = file.length(); + while (uploadUrls.has(Integer.toString(i))) { + //Calculate part size + long partSize = maxPartSize; + if (remainingSize < maxPartSize) { + partSize = remainingSize; + } + remainingSize -= partSize; + println("Creating job for " + partSize + " bytes"); + HttpPartUploadJob uj = new HttpPartUploadJob(i, uploadUrls.getString(Integer.toString(i)), file, partSize, mpUploadInfoMap); + + executor.execute(uj); + i++; + } + float total = (float) (i - 1); + println("All " + total + " parts for: " + storageIdentifier + " queued."); + //Tell the executor that there are no more jobs coming + executor.shutdown(); + //And wait until it finishes the ones that are running + try { + while (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + printStatus(mpUploadInfoMap.size() / total); + } + } catch (InterruptedException e) { + println("Upload of " + storageIdentifier + " interrupted: " + e.getMessage()); + //Call abort? + } + boolean fileUploadComplete = true; + + for (String part : (Set) uploadUrls.keySet()) { + if (!mpUploadInfoMap.containsKey(part)) { + fileUploadComplete = false; + break; + } + } + //Technically, the uploads to S3 could succeed and only the md5 fails, but in this case we still want to abort the MP Upload, not complete it. + if (!mpUploadInfoMap.containsKey("md5")) { + fileUploadComplete = false; + } + if (fileUploadComplete) { + println("Part uploads Completed for " + storageIdentifier); + HttpPut completeUpload = new HttpPut(server + completeUrl + "&key=" + apiKey); + JSONObject eTags = new JSONObject(); + for (String partNo : (Set) mpUploadInfoMap.keySet()) { + if (!partNo.equals("md5")) { + eTags.put(partNo, mpUploadInfoMap.get(partNo)); + } + } + StringEntity body = new StringEntity(eTags.toString()); + println("ETags: " + eTags.toString()); + completeUpload.setEntity(body); + completeUpload.setHeader("Content-type", "application/json"); + + response = httpclient.execute(completeUpload, getLocalContext()); + EntityUtils.consumeQuietly(response.getEntity()); + status = response.getStatusLine().getStatusCode(); + if (status == 200) { + println("Successful upload of " + file.getAbsolutePath()); + dataId = registerFileWithDataverse(file, path, storageIdentifier, mpUploadInfoMap.get("md5"), retries); + } else { + println("Partial upload of " + file.getAbsolutePath() + ", complete upload failed with status: " + status); + } + + retries = 0; + } else { + HttpDelete delete = new HttpDelete(server + abortUrl + "&key=" + apiKey); + response = httpclient.execute(delete, getLocalContext()); + EntityUtils.consumeQuietly(response.getEntity()); + status = response.getStatusLine().getStatusCode(); + if (status != 204) { + println("Call to " + abortUrl + " failed with status: " + status); + retries = 0; + } else { + println("Upload of " + file.getAbsolutePath() + " failed and upload request successfully aborted."); + println("Upload of large files is not automatically retried - run again to retry this file upload."); + retries = 0; + } + } + } + } else { + println("Retrying: return status was : " + status); + retries--; + } + } catch (IOException e) { + e.printStackTrace(System.out); + println("Error processing file upload " + file.getAbsolutePath() + " : " + e.getMessage()); + retries = 0; + } + } + return dataId; + } + + private String registerFileWithDataverse(Resource file, String path, String storageIdentifier, String checksum, int retries) { + String dataId = null; + // Now post data + String urlString = server + "/api/datasets/:persistentId/add"; + urlString = urlString + "?persistentId=" + datasetPID + "&key=" + apiKey; + while (retries > 0) { + HttpPost httppost = new HttpPost(urlString); + + // ContentBody bin = file.getContentBody(); + MultipartEntityBuilder meb = MultipartEntityBuilder.create(); + // meb.addPart("file", bin); + String jsonData = "{\"storageIdentifier\":\"" + storageIdentifier + "\",\"fileName\":\"" + + file.getName() + "\",\"mimeType\":\"" + file.getMimeType() + "\",\"md5Hash\":\"" + checksum + "\",\"fileSize\":\"" + file.length() + "\""; + if (recurse) { + // Dataverse takes paths without an initial / and ending without a / + // with the path not including the file name + if (path.substring(1).contains("/")) { + String parentPath = path.substring(1, path.lastIndexOf("/")); + jsonData = jsonData + + (!parentPath.isEmpty() ? ",\"directoryLabel\":\"" + parentPath + "\"}" + : "}"); + + } + } else { + jsonData = jsonData + "}"; + } + meb.addTextBody("jsonData", jsonData); + + HttpEntity reqEntity = meb.build(); + httppost.setEntity(reqEntity); + try { + CloseableHttpResponse postResponse = httpclient.execute(httppost, getLocalContext()); + + int postStatus = postResponse.getStatusLine().getStatusCode(); + String postRes = null; + HttpEntity postEntity = postResponse.getEntity(); + if (postEntity != null) { + postRes = EntityUtils.toString(postEntity); + } + if (postStatus == 200) { + JSONObject checksumObject = (new JSONObject(postRes)).getJSONObject("data") + .getJSONArray("files").getJSONObject(0).getJSONObject("dataFile") + .getJSONObject("checksum"); + dataId = checksumObject.getString("type") + ":" + checksumObject.getString("value"); + retries = 0; + int total = 0; + // For new servers, wait up to maxWaitTime for a dataset lock to expire. + while (isLocked() && (total < maxWaitTime)) { + TimeUnit.SECONDS.sleep(1); + total = total + 1; + } + } else if (postStatus == 400 && oldServer) { + // If the call to the lock API fails in isLocked(), oldServer will be set to + // true and + // all we can do for a lock is to keep retrying. + // Unfortunately, the error messages are configurable, so there's no guaranteed + // way to detect + // locks versus other conditions (e.g. file exists), so we can test for unique + // words in the default messages + if ((postRes != null) && postRes.contains("lock")) { + retries--; + } else { + println("Error response when processing " + file.getAbsolutePath() + " : " + + postResponse.getStatusLine().getReasonPhrase()); + // A real error: e.g. This file already exists in the dataset. + if (postRes != null) { + println(postRes); + } + // Skip + retries = 0; + } + } else { + // An error and unlikely that we can recover, so report and move on. + println("Error response when processing " + file.getAbsolutePath() + " : " + + postResponse.getStatusLine().getReasonPhrase()); + if (postRes != null) { + println(postRes); + } + retries = 0; + } + } catch (InterruptedException e) { + e.printStackTrace(); + retries = 0; + } catch (IOException ex) { + retries = 0; + println("Error registering file with dataverse: " + storageIdentifier + " : " + ex.getMessage()); + } + } + return dataId; + } + + private void initHttpPool() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException { + if (trustCerts) { + // use the TrustSelfSignedStrategy to allow Self Signed Certificates + SSLContext sslContext; + SSLConnectionSocketFactory connectionFactory; + + sslContext = SSLContextBuilder + .create() + .loadTrustMaterial(new TrustAllStrategy()) + .build(); + // create an SSL Socket Factory to use the SSLContext with the trust self signed certificate strategy + // and allow all hosts verifier. + connectionFactory = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE); + + Registry registry = RegistryBuilder.create() + .register("https", connectionFactory).build(); + cm = new PoolingHttpClientConnectionManager(registry); + } else { + cm = new PoolingHttpClientConnectionManager(); + } + cm.setDefaultMaxPerRoute(httpConcurrency); + cm.setMaxTotal(httpConcurrency > 20 ? httpConcurrency : 20); + } } diff --git a/src/main/java/org/sead/uploader/dataverse/HttpPartUploadJob.java b/src/main/java/org/sead/uploader/dataverse/HttpPartUploadJob.java new file mode 100644 index 0000000..6396a26 --- /dev/null +++ b/src/main/java/org/sead/uploader/dataverse/HttpPartUploadJob.java @@ -0,0 +1,107 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.sead.uploader.dataverse; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import static org.sead.uploader.AbstractUploader.println; +import org.sead.uploader.util.Resource; + +/** + * + * @author Jim + */ +public class HttpPartUploadJob implements Runnable { + + int partNo; + long size; + String signedUrl; + Resource file; + Map eTags; + + static private long partSize = -1; + static private CloseableHttpClient httpClient = null; + static private HttpClientContext localContext = null; + + public static void setHttpClient(CloseableHttpClient client) { + httpClient = client; + } + + public static void setHttpClientContext(HttpClientContext context) { + localContext = context; + } + + public static void setPartSize(long ps) { + partSize=ps; + } + + public HttpPartUploadJob(int partNo, String url, Resource file, long size, Map eTags) throws IllegalStateException { + if ((size == -1) || (httpClient == null) || (localContext == null)) { + throw new IllegalStateException("partSize not set"); + } + this.partNo = partNo; + this.signedUrl = url; + this.file = file; + this.size=size; + this.eTags = eTags; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + public void run() { + int retries = 3; + //println("Starting upload of part: " + partNo); + while (retries > 0) { + try (InputStream is = file.getInputStream((partNo - 1) * partSize, size)) { + + HttpPut httpput = new HttpPut(signedUrl); + httpput.setEntity(new InputStreamEntity(is, size)); + CloseableHttpResponse putResponse = httpClient.execute(httpput); + int putStatus = putResponse.getStatusLine().getStatusCode(); + String putRes = null; + HttpEntity putEntity = putResponse.getEntity(); + if (putEntity != null) { + putRes = EntityUtils.toString(putEntity); + } + if (putStatus == 200) { + //Part successfully stored - parse the eTag from the response and it it to the Map + String eTag = putResponse.getFirstHeader("ETag").getValue(); + eTag= eTag.replace("\"",""); + eTags.put(Integer.toString(partNo), eTag); + retries = 0; + //println("Completed upload of part: " + partNo); + } else { + if (putStatus >= 500) { + println("Upload of part: " + partNo + " failed with status: " + putStatus + " (skipping)"); + println("Error response: " + putResponse.getStatusLine() + " : " + putRes); + retries--; + } else { + println("Upload of part: " + partNo + " failed with status: " + putStatus + " (retrying)"); + println("Error response: " + putResponse.getStatusLine() + " : " + putRes); + + retries--; + } + } + + } catch (IOException e) { + e.printStackTrace(System.out); + println("Error uploading part: " + partNo + " : " + e.getMessage()); + retries = 0; + } + } + } +} diff --git a/src/main/java/org/sead/uploader/dataverse/MD5Job.java b/src/main/java/org/sead/uploader/dataverse/MD5Job.java new file mode 100644 index 0000000..bd0b42a --- /dev/null +++ b/src/main/java/org/sead/uploader/dataverse/MD5Job.java @@ -0,0 +1,62 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.sead.uploader.dataverse; + +import java.io.IOException; +import java.io.InputStream; +import java.security.DigestInputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Map; +import org.apache.commons.codec.binary.Hex; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import static org.sead.uploader.AbstractUploader.println; +import org.sead.uploader.util.Resource; + +/** + * + * @author Jim + */ +public class MD5Job implements Runnable { + + Resource file; + Map infoMap; + + public MD5Job(Resource file, Map infoMap) throws IllegalStateException { + this.file = file; + this.infoMap = infoMap; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + public void run() { + try { + MessageDigest messageDigest = MessageDigest.getInstance("MD5"); + + try (InputStream inStream = file.getInputStream(); DigestInputStream digestInputStream = new DigestInputStream(inStream, messageDigest)) { + byte[] bytes = new byte[64*1024]; + while(digestInputStream.read(bytes) >= 0) { + } + String checksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); + infoMap.put("md5", checksum); + } catch (IOException e) { + e.printStackTrace(System.out); + println("Error calculating digest for: " + file.getAbsolutePath() + " : " + e.getMessage()); + } + } catch (NoSuchAlgorithmException nsae) { + println("MD5 algorithm not found: " + nsae.getMessage()); + } + } +} diff --git a/src/main/java/org/sead/uploader/util/FileResource.java b/src/main/java/org/sead/uploader/util/FileResource.java index 41c0b5b..7006e06 100644 --- a/src/main/java/org/sead/uploader/util/FileResource.java +++ b/src/main/java/org/sead/uploader/util/FileResource.java @@ -31,12 +31,13 @@ import org.apache.commons.codec.binary.Hex; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; import org.apache.http.entity.ContentType; import org.apache.http.entity.mime.content.ContentBody; import org.apache.http.entity.mime.content.FileBody; import org.json.JSONObject; -public class FileResource implements Resource { +public class FileResource extends Resource { private File f; @@ -163,5 +164,4 @@ public JSONObject getMetadata() { // No extra metadata by default for files return new JSONObject(); } - } diff --git a/src/main/java/org/sead/uploader/util/PublishedFolderProxyResource.java b/src/main/java/org/sead/uploader/util/PublishedFolderProxyResource.java index 3d09de3..d4b5977 100644 --- a/src/main/java/org/sead/uploader/util/PublishedFolderProxyResource.java +++ b/src/main/java/org/sead/uploader/util/PublishedFolderProxyResource.java @@ -40,7 +40,7 @@ * @author Jim * */ -public class PublishedFolderProxyResource extends PublishedResource implements Resource { +public class PublishedFolderProxyResource extends PublishedResource { private PublishedResource resource; private String message; diff --git a/src/main/java/org/sead/uploader/util/PublishedResource.java b/src/main/java/org/sead/uploader/util/PublishedResource.java index 34b24c3..b552f44 100644 --- a/src/main/java/org/sead/uploader/util/PublishedResource.java +++ b/src/main/java/org/sead/uploader/util/PublishedResource.java @@ -33,7 +33,7 @@ import org.json.JSONException; import org.json.JSONObject; -public class PublishedResource implements Resource { +public class PublishedResource extends Resource { protected JSONObject resource; private String path; diff --git a/src/main/java/org/sead/uploader/util/Resource.java b/src/main/java/org/sead/uploader/util/Resource.java index ce75f93..74ab1e3 100644 --- a/src/main/java/org/sead/uploader/util/Resource.java +++ b/src/main/java/org/sead/uploader/util/Resource.java @@ -15,33 +15,55 @@ ***************************************************************************** */ package org.sead.uploader.util; +import java.io.FileNotFoundException; +import java.io.IOException; import java.io.InputStream; +import org.apache.commons.io.input.BoundedInputStream; import org.apache.http.entity.mime.content.ContentBody; import org.json.JSONObject; -public interface Resource extends Iterable { +public abstract class Resource implements Iterable { - String getName(); + public abstract String getName(); - boolean isDirectory(); + public abstract boolean isDirectory(); - String getPath(); + public abstract String getPath(); - long length(); + public abstract long length(); - String getAbsolutePath(); + public abstract String getAbsolutePath(); - ContentBody getContentBody(); + public abstract ContentBody getContentBody(); - InputStream getInputStream(); + public abstract InputStream getInputStream(); - Iterable listResources(); + public abstract Iterable listResources(); - String getHash(String algorithm); + public abstract String getHash(String algorithm); - JSONObject getMetadata(); + public abstract JSONObject getMetadata(); - String getMimeType(); + public abstract String getMimeType(); + + /** + * + * @param l + * @param partSize + * @return + */ + public InputStream getInputStream(long l, long partSize) { + try { + InputStream is = getInputStream(); + is.skip(l); + return new BoundedInputStream(is, partSize); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } }