Skip to content

Commit

Permalink
MODEXPW-487 - S3 Env Vars optimisation (#563)
Browse files Browse the repository at this point in the history
* MODEXPW-487 - s3 envs updates

* MODEXPW-487 - s3 envs updates

* MODEXPW-487 - s3 envs updates

* MODEXPW-487 - s3 envs updates for tests

* MODEXPW-487 - s3 paths updates

* MODEXPW-487 - update eholdings tests

* MODEXPW-487 - rename folder to subPath

* MODEXPW-487 - update tests

* MODEXPW-487 - update testWriteReadPatchDelete()

* MODEXPW-487 - update testWriteReadPatchDelete()

* MODEXPW-487 - update composeObject()

* MODEXPW-487 - base files storage

* MODEXPW-487 - update readme

* MODEXPW-487 - update readme

* MODEXPW-487 - testContainsFile

* MODEXPW-487 - LocalFilesStorage with folioS3client

* Revert "MODEXPW-487 - LocalFilesStorage with folioS3client"

This reverts commit 1cff1b3.

* MODEXPW-487 - S3_IS_AWS

* MODEXPW-487 - remove unused env variable

* MODEXPW-487 - update readme
  • Loading branch information
alekGbuz authored Sep 2, 2024
1 parent a4f5cef commit a3ab1b1
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 82 deletions.
54 changes: 25 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,30 @@ AWS container: memory - 3072, memory (soft limit) - 2600, cpu - 1024.
This module uses separate storage of temporary (local) files for its work. These files are necessary for processing bulk-edit business flows.
Any S3-compatible storage (AWS S3, Minio Server) supported by the Minio Client can be used as such storage. Thus, in addition to the
AWS configuration (AWS_URL, AWS_REGION, AWS_BUCKET, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) of the permanent storage,
one need to configure the environment settings for temporary files storage (LOCAL_FS_URL, LOCAL_FS_REGION, LOCAL_FS_BUCKET, LOCAL_FS_ACCESS_KEY_ID, LOCAL_FS_SECRET_ACCESS_KEY).
Typically, these options must specify a separate storage. It should be noted that a single storage can also be used for the results of processing and storing temporary files,
but in this case it is necessary to use different buckets.
It is also necessary to specify variable LOCAL_FS_COMPOSE_WITH_AWS_SDK to determine if AWS S3 is used as files storage. By default this variable is `false` and means that MinIO server is used as files storage.
one need to configure the environment settings for s3 subpathes (S3_SUB_PATH, S3_LOCAL_SUB_PATH).
Typically, these options must specify a separate pathes.
It is also necessary to specify variable S3_IS_AWS to determine if AWS S3 is used as files storage. By default this variable is `false` and means that MinIO server is used as files storage.
This value should be `true` if AWS S3 is used as storage.

| Name | Default value | Description |
|:--------------------------------------------------|:-----------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| KAFKA_HOST | localhost | Kafka broker hostname |
| KAFKA_PORT | 9092 | Kafka broker port |
| KAFKA_CONSUMER_POLL_INTERVAL | 3600000 | Max interval before next poll. If long record processing is in place and interval exceeded then consumer will be kicked out of the group and another consumer will start processing the same message. |
| ENV | folio | Environment name |
| AWS_URL | http://127.0.0.1:9000/ | AWS url |
| AWS_REGION | - | AWS region |
| AWS_BUCKET | - | AWS bucket |
| AWS_ACCESS_KEY_ID | - | AWS access key |
| AWS_SECRET_ACCESS_KEY | - | AWS secret key |
| LOCAL_FS_URL | http://127.0.0.1:9000/ | S3-compatible local files storage url |
| LOCAL_FS_REGION | - | S3-compatible local files storage region |
| LOCAL_FS_BUCKET | - | S3-compatible local files storage bucket |
| LOCAL_FS_ACCESS_KEY_ID | - | S3-compatible local files storage access key |
| LOCAL_FS_SECRET_ACCESS_KEY | - | S3-compatible local files storage secret key |
| URL_EXPIRATION_TIME | 604800 | Presigned url expiration time (in seconds) |
| DATA_EXPORT_JOB_UPDATE_TOPIC_PARTITIONS | 50 | Number of partitions for topic |
| KAFKA_CONCURRENCY_LEVEL | 30 | Concurrency level of kafka listener |
| LOCAL_FS_COMPOSE_WITH_AWS_SDK | false | Specify if AWS S3 is used as local files storage |
| E_HOLDINGS_BATCH_JOB_CHUNK_SIZE | 100 | Specify chunk size for eHoldings export job which will be used to query data from kb-ebsco, write to database, read from database and write to file |
| E_HOLDINGS_BATCH_KB_EBSCO_CHUNK_SIZE | 100 | Amount to retrieve per request to mod-kb-ebsco-java (100 is max acceptable value) |
| AUTHORITY_CONTROL_BATCH_JOB_CHUNK_SIZE | 100 | Specify chunk size for authority control export job which will be used to query data from entities-links, and write to file |
| AUTHORITY_CONTROL_BATCH_ENTITIES_LINKS_CHUNK_SIZE | 100 | Amount to retrieve per request to mod-entities-links |
| MAX_UPLOADED_FILE_SIZE | 40MB | Specifies multipart upload file size |
| Name | Default value | Description |
|:--------------------------------------------------|:------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| KAFKA_HOST | localhost | Kafka broker hostname |
| KAFKA_PORT | 9092 | Kafka broker port |
| KAFKA_CONSUMER_POLL_INTERVAL | 3600000 | Max interval before next poll. If long record processing is in place and interval exceeded then consumer will be kicked out of the group and another consumer will start processing the same message. |
| ENV | folio | Environment name |
| S3_URL | http://127.0.0.1:9000/ | AWS url |
| S3_REGION | - | AWS region |
| S3_BUCKET | - | AWS bucket |
| S3_ACCESS_KEY_ID | - | AWS access key |
| S3_SECRET_ACCESS_KEY | - | AWS secret key |
| S3_SUB_PATH | mod-data-export-worker/remote | S3 subpath for files storage |
| S3_LOCAL_SUB_PATH | mod-data-export-worker/local | S3 subpath for local files storage |
| S3_IS_AWS | false | Specify if AWS S3 is used as files storage |
| URL_EXPIRATION_TIME | 604800 | Presigned url expiration time (in seconds) |
| DATA_EXPORT_JOB_UPDATE_TOPIC_PARTITIONS | 50 | Number of partitions for topic |
| KAFKA_CONCURRENCY_LEVEL | 30 | Concurrency level of kafka listener |
| E_HOLDINGS_BATCH_JOB_CHUNK_SIZE | 100 | Specify chunk size for eHoldings export job which will be used to query data from kb-ebsco, write to database, read from database and write to file |
| E_HOLDINGS_BATCH_KB_EBSCO_CHUNK_SIZE | 100 | Amount to retrieve per request to mod-kb-ebsco-java (100 is max acceptable value) |
| AUTHORITY_CONTROL_BATCH_JOB_CHUNK_SIZE | 100 | Specify chunk size for authority control export job which will be used to query data from entities-links, and write to file |
| AUTHORITY_CONTROL_BATCH_ENTITIES_LINKS_CHUNK_SIZE | 100 | Amount to retrieve per request to mod-entities-links |
| MAX_UPLOADED_FILE_SIZE | 40MB | Specifies multipart upload file size |
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class MinioClientProperties {
* Key that enables files merging in storage with using AWS SDK capabilities.
*/
private boolean composeWithAwsSdk;

/**
* Path in s3 bucket.
*/
private String subPath;

/**
* Presigned url expiration time (in seconds).
*/
Expand Down
32 changes: 26 additions & 6 deletions src/main/java/org/folio/dew/repository/BaseFilesStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.folio.dew.config.properties.MinioClientProperties;
import org.folio.dew.error.FileOperationException;
Expand Down Expand Up @@ -58,14 +57,18 @@

import static io.minio.ObjectWriteArgs.MIN_MULTIPART_SIZE;
import static java.lang.String.format;
import static org.folio.dew.utils.Constants.PATH_SEPARATOR;

@Log4j2
public class BaseFilesStorage implements S3CompatibleStorage {

private static final String SET_VALUE = "<set>";
private static final String NOT_SET_VALUE = "<not set>";
private final MinioClient client;
private S3Client s3Client;
private final String bucket;
private final String region;
private final String subPath;

private final boolean isComposeWithAwsSdk;

Expand All @@ -75,9 +78,11 @@ public BaseFilesStorage(MinioClientProperties properties) {
final String regionName = properties.getRegion();
final String bucketName = properties.getBucket();
final String secretKey = properties.getSecretKey();
subPath = properties.getSubPath();
isComposeWithAwsSdk = properties.isComposeWithAwsSdk();
log.info("Creating MinIO client endpoint {},region {},bucket {},accessKey {},secretKey {}, isComposedWithAwsSdk {}.", endpoint, regionName, bucketName,
StringUtils.isNotBlank(accessKey) ? "<set>" : "<not set>", StringUtils.isNotBlank(secretKey) ? "<set>" : "<not set>", isComposeWithAwsSdk);
log.info("Creating MinIO client endpoint {},region {},bucket {},accessKey {},secretKey {}, subPath {}, isComposedWithAwsSdk {}.", endpoint, regionName, bucketName,
StringUtils.isNotBlank(accessKey) ? SET_VALUE : NOT_SET_VALUE, StringUtils.isNotBlank(secretKey) ? SET_VALUE : NOT_SET_VALUE,
StringUtils.isNotBlank(subPath) ? SET_VALUE : NOT_SET_VALUE, isComposeWithAwsSdk);

var builder = MinioClient.builder().endpoint(endpoint);
if (StringUtils.isNotBlank(regionName)) {
Expand Down Expand Up @@ -148,6 +153,7 @@ public void createBucketIfNotExists() {
* @throws IOException - if an I/O error occurs
*/
public String upload(String path, String filename) throws IOException {
path = getS3Path(path);
try {
return client.uploadObject(UploadObjectArgs.builder()
.bucket(bucket)
Expand All @@ -171,7 +177,7 @@ public String upload(String path, String filename) throws IOException {
* @throws IOException - if an I/O error occurs
*/
public String write(String path, byte[] bytes, Map<String, String> headers) throws IOException {

path = getS3Path(path);
if (isComposeWithAwsSdk) {
log.info("Writing with using AWS SDK client");
s3Client.putObject(PutObjectRequest.builder().bucket(bucket)
Expand Down Expand Up @@ -210,7 +216,7 @@ public String write(String path, byte[] bytes) throws IOException {
* @throws IOException - if an I/O error occurs
*/
public String writeFile(String path, Path inputPath, Map<String, String> headers) throws IOException {

path = getS3Path(path);
if (isComposeWithAwsSdk) {
log.info("Writing file using AWS SDK client");
s3Client.putObject(PutObjectRequest.builder().bucket(bucket)
Expand Down Expand Up @@ -247,6 +253,7 @@ public String writeFile(String destPath, Path inputPath) throws IOException {
* @throws IOException if an I/O error occurs
*/
public void append(String path, byte[] bytes) throws IOException {
path = getS3Path(path);
try {
if (notExists(path)) {
log.info("Appending non-existing file");
Expand Down Expand Up @@ -348,6 +355,7 @@ public void append(String path, byte[] bytes) throws IOException {
* @throws FileOperationException if an I/O error occurs
*/
public void delete(String path) {
path = getS3Path(path);
try {
var paths = walk(path).collect(Collectors.toList());

Expand Down Expand Up @@ -376,7 +384,7 @@ public void delete(String path) {
* @throws FileOperationException if an I/O error occurs
*/
public Stream<String> walk(String path) {
return getInternalStructure(path, true);
return getInternalStructure(getS3Path(path), true);
}

/**
Expand All @@ -386,6 +394,7 @@ public Stream<String> walk(String path) {
* @return true if file exists, otherwise - false
*/
public boolean exists(String path) {
path = getS3Path(path);
var iterator = client.listObjects(ListObjectsArgs.builder()
.bucket(bucket)
.region(region)
Expand Down Expand Up @@ -418,6 +427,7 @@ public boolean notExists(String path) {
* @throws IOException - if an I/O error occurs reading from the file
*/
public InputStream newInputStream(String path) throws IOException {
path = getS3Path(path);
try {
return client.getObject(GetObjectArgs.builder()
.bucket(bucket)
Expand Down Expand Up @@ -545,4 +555,14 @@ private Stream<String> getInternalStructure(String path, boolean isRecursive) {
return null;
}
}

public String getS3Path(String path) {
if (StringUtils.isBlank(subPath) || StringUtils.startsWith(path, subPath + PATH_SEPARATOR)) {
return path;
}
if (path.startsWith(PATH_SEPARATOR)) {
return subPath + path;
}
return subPath + PATH_SEPARATOR + path;
}
}
Loading

0 comments on commit a3ab1b1

Please sign in to comment.