From 3f2ecbb00a4ceca77b09ac83d97e1a4709daa32a Mon Sep 17 00:00:00 2001 From: milanmajchrak <90026355+milanmajchrak@users.noreply.github.com> Date: Thu, 21 Dec 2023 17:30:25 +0100 Subject: [PATCH] ufal/be-s3-customization (#481) * The bitstream data are stored in to local store after uploading to S3 * The bitstream is removed from the S3 and local assetstore * Store number is specific if all storages are synchronized. * Return store number in the BitstreamRest object * Find out which store is used - it could be synchronized stores number. * Constant is moved to the Bitstream class * Synchronization of storages is not allowed by default - set up it in the test environment. * Added docs * Removed constant from the Bitstream class - it wasn't consistent * Overriden BitstreamStorageServiceImpl by custom SyncBitstreamStorageServiceImpl * Removed ClarinS3BitStoreService.java to SyncS3BitStoreService * Added doc and refactoring. --- .../storage/bitstore/S3BitStoreService.java | 6 +- .../SyncBitstreamStorageServiceImpl.java | 330 ++++++++++++++++++ .../bitstore/SyncS3BitStoreService.java | 101 ++++++ .../test/data/dspaceFolder/config/local.cfg | 3 + .../rest/converter/BitstreamConverter.java | 1 + .../dspace/app/rest/model/BitstreamRest.java | 10 + dspace/config/clarin-dspace.cfg | 4 + dspace/config/spring/api/bitstore.xml | 4 +- 8 files changed, 454 insertions(+), 5 deletions(-) create mode 100644 dspace-api/src/main/java/org/dspace/storage/bitstore/SyncBitstreamStorageServiceImpl.java create mode 100644 dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java index ad6c431aed9e..1ad4c33f8213 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java @@ -73,7 +73,7 @@ public class S3BitStoreService extends BaseBitStoreService { /** * Checksum algorithm */ - private static final String CSA = "MD5"; + protected static final String CSA = "MD5"; // These settings control the way an identifier is hashed into // directory and file names @@ -110,13 +110,13 @@ public class S3BitStoreService extends BaseBitStoreService { /** * S3 service */ - private AmazonS3 s3Service = null; + protected AmazonS3 s3Service = null; /** * S3 transfer manager * this is reused between put calls to use less resources for multiple uploads */ - private TransferManager tm = null; + protected TransferManager tm = null; private static final ConfigurationService configurationService = DSpaceServicesFactory.getInstance().getConfigurationService(); diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncBitstreamStorageServiceImpl.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncBitstreamStorageServiceImpl.java new file mode 100644 index 000000000000..e48487955209 --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncBitstreamStorageServiceImpl.java @@ -0,0 +1,330 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.storage.bitstore; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import javax.annotation.Nullable; + +import org.apache.commons.collections4.MapUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.dspace.authorize.AuthorizeException; +import org.dspace.content.Bitstream; +import org.dspace.core.Context; +import org.dspace.core.Utils; +import org.dspace.services.ConfigurationService; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * This class is customization of the BitstreamStorageServiceImpl class. + * The bitstream is synchronized if it is stored in both S3 and local assetstore. + * + * @author Milan Majchrak (milan.majchrak at dataquest.sk) + */ +public class SyncBitstreamStorageServiceImpl extends BitstreamStorageServiceImpl { + + /** + * log4j log + */ + private static final Logger log = LogManager.getLogger(); + private boolean syncEnabled = false; + + private static final int SYNCHRONIZED_STORES_NUMBER = 77; + + @Autowired + ConfigurationService configurationService; + + public SyncBitstreamStorageServiceImpl() { + super(); + } + + @Override + public void afterPropertiesSet() throws Exception { + for (Map.Entry storeEntry : getStores().entrySet()) { + if (storeEntry.getValue().isEnabled() && !storeEntry.getValue().isInitialized()) { + storeEntry.getValue().init(); + } + } + this.syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false); + } + + @Override + public UUID store(Context context, Bitstream bitstream, InputStream is) throws SQLException, IOException { + // Create internal ID + String id = Utils.generateKey(); + /* + * Set the store number of the new bitstream If you want to use some + * other method of working out where to put a new bitstream, here's + * where it should go + */ + if (syncEnabled) { + bitstream.setStoreNumber(SYNCHRONIZED_STORES_NUMBER); + } else { + bitstream.setStoreNumber(getIncoming()); + } + bitstream.setDeleted(true); + bitstream.setInternalId(id); + + + BitStoreService store = this.getStore(getIncoming()); + //For efficiencies sake, PUT is responsible for setting bitstream size_bytes, checksum, and checksum_algorithm + store.put(bitstream, is); + //bitstream.setSizeBytes(file.length()); + //bitstream.setChecksum(Utils.toHex(dis.getMessageDigest().digest())); + //bitstream.setChecksumAlgorithm("MD5"); + + bitstream.setDeleted(false); + try { + //Update our bitstream but turn off the authorization system since permissions haven't been set at this + // point in time. + context.turnOffAuthorisationSystem(); + bitstreamService.update(context, bitstream); + } catch (AuthorizeException e) { + log.error(e); + //Can never happen since we turn off authorization before we update + } finally { + context.restoreAuthSystemState(); + } + + UUID bitstreamId = bitstream.getID(); + + if (log.isDebugEnabled()) { + log.debug("Stored bitstreamID " + bitstreamId); + } + + return bitstreamId; + } + + /** + * Register a bitstream already in storage. + * + * @param context The current context + * @param assetstore The assetstore number for the bitstream to be + * registered + * @param bitstreamPath The relative path of the bitstream to be registered. + * The path is relative to the path of ths assetstore. + * @return The ID of the registered bitstream + * @throws SQLException If a problem occurs accessing the RDBMS + * @throws IOException if IO error + */ + @Override + public UUID register(Context context, Bitstream bitstream, int assetstore, + String bitstreamPath) throws SQLException, IOException, AuthorizeException { + + // mark this bitstream as a registered bitstream + String sInternalId = REGISTERED_FLAG + bitstreamPath; + + // Create a deleted bitstream row, using a separate DB connection + bitstream.setDeleted(true); + bitstream.setInternalId(sInternalId); + if (syncEnabled) { + bitstream.setStoreNumber(SYNCHRONIZED_STORES_NUMBER); + } else { + bitstream.setStoreNumber(assetstore); + } + bitstreamService.update(context, bitstream); + + Map wantedMetadata = new HashMap(); + wantedMetadata.put("size_bytes", null); + wantedMetadata.put("checksum", null); + wantedMetadata.put("checksum_algorithm", null); + + Map receivedMetadata = this.getStore(assetstore).about(bitstream, wantedMetadata); + if (MapUtils.isEmpty(receivedMetadata)) { + String message = "Not able to register bitstream:" + bitstream.getID() + " at path: " + bitstreamPath; + log.error(message); + throw new IOException(message); + } else { + if (receivedMetadata.containsKey("checksum_algorithm")) { + bitstream.setChecksumAlgorithm(receivedMetadata.get("checksum_algorithm").toString()); + } + + if (receivedMetadata.containsKey("checksum")) { + bitstream.setChecksum(receivedMetadata.get("checksum").toString()); + } + + if (receivedMetadata.containsKey("size_bytes")) { + bitstream.setSizeBytes(Long.valueOf(receivedMetadata.get("size_bytes").toString())); + } + } + + bitstream.setDeleted(false); + bitstreamService.update(context, bitstream); + + UUID bitstreamId = bitstream.getID(); + if (log.isDebugEnabled()) { + log.debug("Registered bitstream " + bitstreamId + " at location " + bitstreamPath); + } + return bitstreamId; + } + + @Override + public Map computeChecksum(Context context, Bitstream bitstream) throws IOException { + Map wantedMetadata = new HashMap(); + wantedMetadata.put("checksum", null); + wantedMetadata.put("checksum_algorithm", null); + + int storeNumber = this.whichStoreNumber(bitstream); + Map receivedMetadata = this.getStore(storeNumber).about(bitstream, wantedMetadata); + return receivedMetadata; + } + + @Override + public InputStream retrieve(Context context, Bitstream bitstream) + throws SQLException, IOException { + int storeNumber = this.whichStoreNumber(bitstream); + return this.getStore(storeNumber).get(bitstream); + } + + @Override + public void cleanup(boolean deleteDbRecords, boolean verbose) throws SQLException, IOException, AuthorizeException { + Context context = new Context(Context.Mode.BATCH_EDIT); + int commitCounter = 0; + + try { + context.turnOffAuthorisationSystem(); + + List storage = bitstreamService.findDeletedBitstreams(context); + for (Bitstream bitstream : storage) { + UUID bid = bitstream.getID(); + Map wantedMetadata = new HashMap(); + wantedMetadata.put("size_bytes", null); + wantedMetadata.put("modified", null); + + int storeNumber = this.whichStoreNumber(bitstream); + Map receivedMetadata = this.getStore(storeNumber).about(bitstream, wantedMetadata); + + + // Make sure entries which do not exist are removed + if (MapUtils.isEmpty(receivedMetadata)) { + log.debug("bitstore.about is empty, so file is not present"); + if (deleteDbRecords) { + log.debug("deleting record"); + if (verbose) { + System.out.println(" - Deleting bitstream information (ID: " + bid + ")"); + } + checksumHistoryService.deleteByBitstream(context, bitstream); + if (verbose) { + System.out.println(" - Deleting bitstream record from database (ID: " + bid + ")"); + } + bitstreamService.expunge(context, bitstream); + } + context.uncacheEntity(bitstream); + continue; + } + + // This is a small chance that this is a file which is + // being stored -- get it next time. + if (isRecent(Long.valueOf(receivedMetadata.get("modified").toString()))) { + log.debug("file is recent"); + context.uncacheEntity(bitstream); + continue; + } + + if (deleteDbRecords) { + log.debug("deleting db record"); + if (verbose) { + System.out.println(" - Deleting bitstream information (ID: " + bid + ")"); + } + checksumHistoryService.deleteByBitstream(context, bitstream); + if (verbose) { + System.out.println(" - Deleting bitstream record from database (ID: " + bid + ")"); + } + bitstreamService.expunge(context, bitstream); + } + + if (isRegisteredBitstream(bitstream.getInternalId())) { + context.uncacheEntity(bitstream); + continue; // do not delete registered bitstreams + } + + + // Since versioning allows for multiple bitstreams, check if the internal identifier isn't used on + // another place + if (bitstreamService.findDuplicateInternalIdentifier(context, bitstream).isEmpty()) { + this.getStore(storeNumber).remove(bitstream); + + String message = ("Deleted bitstreamID " + bid + ", internalID " + bitstream.getInternalId()); + if (log.isDebugEnabled()) { + log.debug(message); + } + if (verbose) { + System.out.println(message); + } + } + + // Make sure to commit our outstanding work every 100 + // iterations. Otherwise you risk losing the entire transaction + // if we hit an exception, which isn't useful at all for large + // amounts of bitstreams. + commitCounter++; + if (commitCounter % 100 == 0) { + context.dispatchEvents(); + // Commit actual changes to DB after dispatch events + System.out.print("Performing incremental commit to the database..."); + context.commit(); + System.out.println(" Incremental commit done!"); + } + + context.uncacheEntity(bitstream); + } + + System.out.print("Committing changes to the database..."); + context.complete(); + System.out.println(" Done!"); + } catch (SQLException | IOException sqle) { + // Aborting will leave the DB objects around, even if the + // bitstreams are deleted. This is OK; deleting them next + // time around will be a no-op. + if (verbose) { + System.err.println("Error: " + sqle.getMessage()); + } + context.abort(); + throw sqle; + } finally { + context.restoreAuthSystemState(); + } + } + + @Nullable + @Override + public Long getLastModified(Bitstream bitstream) throws IOException { + Map attrs = new HashMap(); + attrs.put("modified", null); + int storeNumber = this.whichStoreNumber(bitstream); + attrs = this.getStore(storeNumber).about(bitstream, attrs); + if (attrs == null || !attrs.containsKey("modified")) { + return null; + } + return Long.valueOf(attrs.get("modified").toString()); + } + + /** + * Decide which store number should be used for the given bitstream. + * If the bitstream is synchronized (stored in to S3 and local), then the static store number is used. + * Otherwise, the bitstream's store number is used. + * + * @param bitstream bitstream + * @return store number + */ + public int whichStoreNumber(Bitstream bitstream) { + if (Objects.equals(bitstream.getStoreNumber(), SYNCHRONIZED_STORES_NUMBER)) { + return getIncoming(); + } else { + return bitstream.getStoreNumber(); + } + } + +} diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java new file mode 100644 index 000000000000..cae46a512a56 --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java @@ -0,0 +1,101 @@ +/** + * The contents of this file are subject to the license and copyright + * detailed in the LICENSE and NOTICE files at the root of the source + * tree and available online at + * + * http://www.dspace.org/license/ + */ +package org.dspace.storage.bitstore; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.transfer.Upload; +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.dspace.content.Bitstream; +import org.dspace.services.ConfigurationService; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Override of the S3BitStoreService to store all the data also in the local assetstore. + * + * @author Milan Majchrak (milan.majchrak at dataquest.sk) + */ +public class SyncS3BitStoreService extends S3BitStoreService { + + /** + * log4j log + */ + private static final Logger log = LogManager.getLogger(SyncS3BitStoreService.class); + private boolean syncEnabled = false; + + @Autowired(required = true) + DSBitStoreService dsBitStoreService; + + @Autowired(required = true) + ConfigurationService configurationService; + + public SyncS3BitStoreService() { + super(); + } + + public void init() throws IOException { + super.init(); + syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false); + } + + @Override + public void put(Bitstream bitstream, InputStream in) throws IOException { + String key = getFullKey(bitstream.getInternalId()); + //Copy istream to temp file, and send the file, with some metadata + File scratchFile = File.createTempFile(bitstream.getInternalId(), "s3bs"); + try { + FileUtils.copyInputStreamToFile(in, scratchFile); + long contentLength = scratchFile.length(); + // The ETag may or may not be and MD5 digest of the object data. + // Therefore, we precalculate before uploading + String localChecksum = org.dspace.curate.Utils.checksum(scratchFile, CSA); + + Upload upload = tm.upload(getBucketName(), key, scratchFile); + + upload.waitForUploadResult(); + + bitstream.setSizeBytes(contentLength); + bitstream.setChecksum(localChecksum); + bitstream.setChecksumAlgorithm(CSA); + + if (syncEnabled) { + // Upload file into local assetstore + File localFile = dsBitStoreService.getFile(bitstream); + FileUtils.copyFile(scratchFile, localFile); + } + } catch (AmazonClientException | IOException | InterruptedException e) { + log.error("put(" + bitstream.getInternalId() + ", is)", e); + throw new IOException(e); + } finally { + if (!scratchFile.delete()) { + scratchFile.deleteOnExit(); + } + } + } + + @Override + public void remove(Bitstream bitstream) throws IOException { + String key = getFullKey(bitstream.getInternalId()); + try { + // Remove file from S3 + s3Service.deleteObject(getBucketName(), key); + if (syncEnabled) { + // Remove file from local assetstore + dsBitStoreService.remove(bitstream); + } + } catch (AmazonClientException e) { + log.error("remove(" + key + ")", e); + throw new IOException(e); + } + } +} diff --git a/dspace-api/src/test/data/dspaceFolder/config/local.cfg b/dspace-api/src/test/data/dspaceFolder/config/local.cfg index de6caf8880f7..ce3a6ccce07d 100644 --- a/dspace-api/src/test/data/dspaceFolder/config/local.cfg +++ b/dspace-api/src/test/data/dspaceFolder/config/local.cfg @@ -277,3 +277,6 @@ handle.canonical.prefix = ${dspace.ui.url}/handle/ ### File preview ### # File preview is enabled by default file.preview.enabled = true + +### Storage service ### +sync.storage.service.enabled = false diff --git a/dspace-server-webapp/src/main/java/org/dspace/app/rest/converter/BitstreamConverter.java b/dspace-server-webapp/src/main/java/org/dspace/app/rest/converter/BitstreamConverter.java index bb5544b3592c..e5d967afc8a4 100644 --- a/dspace-server-webapp/src/main/java/org/dspace/app/rest/converter/BitstreamConverter.java +++ b/dspace-server-webapp/src/main/java/org/dspace/app/rest/converter/BitstreamConverter.java @@ -45,6 +45,7 @@ public BitstreamRest convert(org.dspace.content.Bitstream obj, Projection projec checksum.setValue(obj.getChecksum()); b.setCheckSum(checksum); b.setSizeBytes(obj.getSizeBytes()); + b.setStoreNumber(obj.getStoreNumber()); return b; } diff --git a/dspace-server-webapp/src/main/java/org/dspace/app/rest/model/BitstreamRest.java b/dspace-server-webapp/src/main/java/org/dspace/app/rest/model/BitstreamRest.java index 8e9efc2680b7..232d96b044a0 100644 --- a/dspace-server-webapp/src/main/java/org/dspace/app/rest/model/BitstreamRest.java +++ b/dspace-server-webapp/src/main/java/org/dspace/app/rest/model/BitstreamRest.java @@ -46,6 +46,8 @@ public class BitstreamRest extends DSpaceObjectRest { @JsonProperty(access = Access.READ_ONLY) private Integer sequenceId; + private int storeNumber; + public String getBundleName() { return bundleName; } @@ -78,6 +80,14 @@ public void setSequenceId(Integer sequenceId) { this.sequenceId = sequenceId; } + public int getStoreNumber() { + return storeNumber; + } + + public void setStoreNumber(int storeNumber) { + this.storeNumber = storeNumber; + } + @Override public String getCategory() { return CATEGORY; diff --git a/dspace/config/clarin-dspace.cfg b/dspace/config/clarin-dspace.cfg index fe7fd16fbea0..478a22b9bbae 100644 --- a/dspace/config/clarin-dspace.cfg +++ b/dspace/config/clarin-dspace.cfg @@ -242,3 +242,7 @@ shibboleth.name.conversion.outputEncoding = UTF-8 ### File preview ### # File preview is enabled by default file.preview.enabled = false + +### Storage service ### +# Synchronization is NOT enabled by default +sync.storage.service.enabled = true diff --git a/dspace/config/spring/api/bitstore.xml b/dspace/config/spring/api/bitstore.xml index 1cf7d8f68a3c..f02edcbc0807 100644 --- a/dspace/config/spring/api/bitstore.xml +++ b/dspace/config/spring/api/bitstore.xml @@ -3,7 +3,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-lazy-init="true"> - + @@ -17,7 +17,7 @@ - +