diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java index 8077a966cf36..ff1e2f86740d 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java @@ -12,12 +12,22 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Upload; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.dspace.content.Bitstream; @@ -38,6 +48,16 @@ public class SyncS3BitStoreService extends S3BitStoreService { private static final Logger log = LogManager.getLogger(SyncS3BitStoreService.class); private boolean syncEnabled = false; + /** + * The uploading file is divided into parts and each part is uploaded separately. The size of the part is 50 MB. + */ + private static final long UPLOAD_FILE_PART_SIZE = 50 * 1024 * 1024; // 50 MB + + /** + * Upload large file by parts - check the checksum of every part + */ + private boolean uploadByParts = false; + @Autowired(required = true) DSBitStoreService dsBitStoreService; @@ -48,9 +68,29 @@ public SyncS3BitStoreService() { super(); } + /** + * Define syncEnabled and uploadByParts in the constructor - this values won't be overridden by the configuration + * + * @param syncEnabled if true, the file will be uploaded to the local assetstore + * @param uploadByParts if true, the file will be uploaded by parts + */ + public SyncS3BitStoreService(boolean syncEnabled, boolean uploadByParts) { + super(); + this.syncEnabled = syncEnabled; + this.uploadByParts = uploadByParts; + } + public void init() throws IOException { super.init(); - syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false); + + // The syncEnabled and uploadByParts could be set to true in the constructor, + // do not override them by the configuration in this case + if (!syncEnabled) { + syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false); + } + if (!uploadByParts) { + uploadByParts = configurationService.getBooleanProperty("s3.upload.by.parts.enabled", false); + } } @Override @@ -66,9 +106,11 @@ public void put(Bitstream bitstream, InputStream in) throws IOException { Utils.bufferedCopy(dis, fos); in.close(); - Upload upload = tm.upload(getBucketName(), key, scratchFile); - - upload.waitForUploadResult(); + if (uploadByParts) { + uploadByParts(key, scratchFile); + } else { + uploadFluently(key, scratchFile); + } bitstream.setSizeBytes(scratchFile.length()); // we cannot use the S3 ETAG here as it could be not a MD5 in case of multipart upload (large files) or if @@ -119,6 +161,7 @@ public void remove(Bitstream bitstream) throws IOException { /** * Create a new file in the assetstore if it does not exist + * * @param localFile * @throws IOException */ @@ -137,4 +180,116 @@ private void createFileIfNotExist(File localFile) throws IOException { " was not created"); } } + + /** + * Upload a file fluently. The file is uploaded in a single request. + * + * @param key the bitstream's internalId + * @param scratchFile the file to upload + * @throws InterruptedException if the S3 upload is interrupted + */ + private void uploadFluently(String key, File scratchFile) throws InterruptedException { + Upload upload = tm.upload(getBucketName(), key, scratchFile); + + upload.waitForUploadResult(); + } + + /** + * Upload a file by parts. The file is divided into parts and each part is uploaded separately. + * The checksum of each part is checked. If the checksum does not match, the file is not uploaded. + * + * @param key the bitstream's internalId + * @param scratchFile the file to upload + * @throws IOException if an I/O error occurs + */ + private void uploadByParts(String key, File scratchFile) throws IOException { + // Initialize MessageDigest for computing checksum + MessageDigest digest; + try { + digest = MessageDigest.getInstance("MD5"); + } catch (Exception e) { + throw new RuntimeException("MD5 algorithm not available", e); + } + + // Initiate multipart upload + InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(getBucketName(), key); + String uploadId = this.s3Service.initiateMultipartUpload(initiateRequest).getUploadId(); + + // Create a list to hold the ETags for individual parts + List partETags = new ArrayList<>(); + + try { + // Upload parts + File file = new File(scratchFile.getPath()); + long fileLength = file.length(); + long remainingBytes = fileLength; + int partNumber = 1; + + while (remainingBytes > 0) { + long bytesToUpload = Math.min(UPLOAD_FILE_PART_SIZE, remainingBytes); + + // Calculate the checksum for the part + String partChecksum = calculatePartChecksum(file, fileLength - remainingBytes, bytesToUpload, digest); + + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(this.getBucketName()) + .withKey(key) + .withUploadId(uploadId) + .withPartNumber(partNumber) + .withFile(file) + .withFileOffset(fileLength - remainingBytes) + .withPartSize(bytesToUpload); + + // Upload the part + UploadPartResult uploadPartResponse = this.s3Service.uploadPart(uploadRequest); + + // Collect the ETag for the part + partETags.add(uploadPartResponse.getPartETag()); + + // Compare checksums - local with ETag + if (!StringUtils.equals(uploadPartResponse.getETag(), partChecksum)) { + String errorMessage = "Checksums do not match error: The locally computed checksum does " + + "not match with the ETag from the UploadPartResult. Local checksum: " + partChecksum + + ", ETag: " + uploadPartResponse.getETag() + ", partNumber: " + partNumber; + log.error(errorMessage); + throw new IOException(errorMessage); + } + + remainingBytes -= bytesToUpload; + partNumber++; + } + + // Complete the multipart upload + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(this.getBucketName(), + key, uploadId, partETags); + this.s3Service.completeMultipartUpload(completeRequest); + } catch (AmazonClientException e) { + log.error("Cannot upload the file by parts because: ", e); + } + } + + /** + * Calculate the checksum of the specified part of the file (Multipart upload) + * + * @param file the uploading file + * @param offset the offset in the file + * @param length the length of the part + * @param digest the message digest for computing the checksum + * @return the checksum of the part + * @throws IOException if an I/O error occurs + */ + public static String calculatePartChecksum(File file, long offset, long length, MessageDigest digest) + throws IOException { + try (FileInputStream fis = new FileInputStream(file); + DigestInputStream dis = new DigestInputStream(fis, digest)) { + // Skip to the specified offset + fis.skip(offset); + + // Read the specified length + IOUtils.copyLarge(dis, OutputStream.nullOutputStream(), 0, length); + + // Convert the digest to a hex string + return Utils.toHex(digest.digest()); + } + } } diff --git a/dspace/config/clarin-dspace.cfg b/dspace/config/clarin-dspace.cfg index a13f504ab9ad..7f47d9896cd5 100644 --- a/dspace/config/clarin-dspace.cfg +++ b/dspace/config/clarin-dspace.cfg @@ -246,6 +246,8 @@ file.preview.enabled = false ### Storage service ### # Synchronization is NOT enabled by default sync.storage.service.enabled = true +# Upload large file by parts - check the checksum of every part +s3.upload.by.parts.enabled = true ### The build version is stored in the specific file ###