diff --git a/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java b/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java index 93cfbd5..3f8377a 100644 --- a/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java +++ b/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java @@ -2,6 +2,8 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -16,4 +18,10 @@ public ObjectMapper objectMapper() { mapper.setSerializationInclusion(Include.NON_NULL); return mapper; } + + @Bean + public DateTimeFormatter formatter() { + return DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); + } + } diff --git a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java index dc57fc5..36910b4 100644 --- a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java +++ b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java @@ -1,8 +1,8 @@ package eu.dissco.exportjob.repository; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.SortOrder; import co.elastic.clients.elasticsearch._types.query_dsl.Query; -import co.elastic.clients.elasticsearch.core.CountRequest; import co.elastic.clients.elasticsearch.core.SearchRequest; import co.elastic.clients.elasticsearch.core.search.Hit; import com.fasterxml.jackson.databind.JsonNode; @@ -22,36 +22,34 @@ @Repository @RequiredArgsConstructor public class ElasticSearchRepository { + private final ElasticsearchClient client; private final ElasticSearchProperties properties; - - public Long getTotalHits(List searchParams, TargetType targetType) - throws IOException { - var query = generateQuery(searchParams); - var index = getIndex(targetType); - var countRequest = new CountRequest.Builder() - .index(index) - .query( - q -> q.bool(b -> b.must(query))) - .build(); - return client.count(countRequest).count(); - } + private static final String SORT_BY = "dcterms:identifier.keyword"; public List getTargetObjects(List searchParams, TargetType targetType, - int pageNumber) + String lastId, List targetFields) throws IOException { var query = generateQuery(searchParams); var index = getIndex(targetType); - var searchRequest = new SearchRequest.Builder() + var searchRequestBuilder = new SearchRequest.Builder() .index(index) .query( q -> q.bool(b -> b.must(query))) .trackTotalHits(t -> t.enabled(Boolean.TRUE)) - .from(getOffset(pageNumber, properties.getPageSize())) .size(properties.getPageSize()) - .build(); - var searchResult = client.search(searchRequest, ObjectNode.class); + .sort(s -> s.field(f -> f.field(SORT_BY).order(SortOrder.Desc))); + if (lastId != null) { + searchRequestBuilder + .searchAfter(sa -> sa.stringValue(lastId)); + } + if (targetFields != null) { + searchRequestBuilder + .source(sourceConfig -> sourceConfig + .filter(filter -> filter.includes(targetFields))); + } + var searchResult = client.search(searchRequestBuilder.build(), ObjectNode.class); return searchResult.hits().hits().stream() .map(Hit::source) @@ -65,7 +63,6 @@ private String getIndex(TargetType targetType) { : properties.getDigitalMediaObjectIndex(); } - private static List generateQuery(List searchParams) { var qList = new ArrayList(); for (var searchParam : searchParams) { @@ -88,12 +85,4 @@ private static List generateQuery(List searchParams) { return qList; } - private static int getOffset(int pageNumber, int pageSize) { - int offset = 0; - if (pageNumber > 1) { - offset = offset + (pageSize * (pageNumber - 1)); - } - return offset; - } - } diff --git a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java index b548667..89c993f 100644 --- a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java +++ b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java @@ -1,6 +1,8 @@ package eu.dissco.exportjob.repository; import java.io.File; +import java.time.Instant; +import java.time.format.DateTimeFormatter; import java.util.UUID; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Repository; @@ -12,6 +14,7 @@ public class S3Repository { private final S3AsyncClient s3Client; + private final DateTimeFormatter formatter; private static final String BUCKET_NAME = "dissco-data-export"; public String uploadResults(File file, UUID jobId) { @@ -20,7 +23,7 @@ public String uploadResults(File file, UUID jobId) { .uploadFile(uploadFileRequest -> uploadFileRequest .putObjectRequest(putObjectRequest -> putObjectRequest .bucket(BUCKET_NAME) - .key(jobId.toString())) + .key(getDate() + "/" + jobId.toString())) .source(file)); upload.completionFuture().join(); return s3Client.utilities().getUrl( @@ -31,5 +34,8 @@ public String uploadResults(File file, UUID jobId) { } } + private String getDate() { + return formatter.format(Instant.now()); + } } diff --git a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java index 596f7e2..fe63996 100644 --- a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java +++ b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java @@ -1,5 +1,8 @@ package eu.dissco.exportjob.service; +import static eu.dissco.exportjob.service.DoiListService.ID_FIELD; + +import co.elastic.clients.elasticsearch._types.query_dsl.FieldAndFormat; import com.fasterxml.jackson.databind.JsonNode; import eu.dissco.exportjob.domain.JobRequest; import eu.dissco.exportjob.domain.JobStateEndpoint; @@ -23,13 +26,15 @@ public abstract class AbstractExportJobService { private final ExporterBackendClient exporterBackendClient; private final S3Repository s3Repository; protected static final String TEMP_FILE_NAME = "src/main/resources/tmp.csv.gz"; + protected static final String ID_FIELD = "dcterms:identifier"; + protected static final String PHYSICAL_ID_FIELD = "ods:physicalSpecimenID"; public void handleMessage(JobRequest jobRequest) throws FailedProcessingException { exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.RUNNING); try { var uploadData = processSearchResults(jobRequest); - if (uploadData){ + if (uploadData) { var url = s3Repository.uploadResults(new File(TEMP_FILE_NAME), jobRequest.jobId()); exporterBackendClient.markJobAsComplete(jobRequest.jobId(), url); } else { @@ -42,26 +47,29 @@ public void handleMessage(JobRequest jobRequest) throws FailedProcessingExceptio } private boolean processSearchResults(JobRequest jobRequest) throws IOException { - var totalHits = elasticSearchRepository.getTotalHits(jobRequest.searchParams(), - jobRequest.targetType()); - if (totalHits > 0){ - int pageNum = 1; - var hitsProcessed = 0; - writeHeaderToFile(); - while (hitsProcessed < totalHits) { - var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), - jobRequest.targetType(), pageNum); + String lastId = null; + writeHeaderToFile(); + boolean keepSearching = true; + boolean resultsProcessed = false; + var targetFields = targetFields(); + while (keepSearching) { + var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), + jobRequest.targetType(), lastId, targetFields); + if (searchResult.isEmpty()){ + keepSearching = false; + } else { writeResultsToFile(searchResult); - hitsProcessed = hitsProcessed + searchResult.size(); - pageNum = pageNum + 1; + lastId = searchResult.getLast().get(ID_FIELD).asText(); + resultsProcessed = true; } - return true; } - return false; + return resultsProcessed; } protected abstract void writeHeaderToFile() throws IOException; protected abstract void writeResultsToFile(List searchResult) throws IOException; + protected abstract List targetFields(); + } diff --git a/src/main/java/eu/dissco/exportjob/service/DoiListService.java b/src/main/java/eu/dissco/exportjob/service/DoiListService.java index 57a3921..1624ea7 100644 --- a/src/main/java/eu/dissco/exportjob/service/DoiListService.java +++ b/src/main/java/eu/dissco/exportjob/service/DoiListService.java @@ -17,9 +17,7 @@ @Profile(Profiles.DOI_LIST) public class DoiListService extends AbstractExportJobService { - private static final String ODS_ID = "ods:ID"; - private static final String PHYSICAL_ID = "ods:physicalSpecimenID"; - private static final byte[] HEADER = (ODS_ID + "," + PHYSICAL_ID).getBytes( + private static final byte[] HEADER = (ID_FIELD + "," + PHYSICAL_ID_FIELD).getBytes( StandardCharsets.UTF_8); public DoiListService( @@ -41,10 +39,16 @@ protected void writeResultsToFile(List searchResults) throws IOExcepti var byteOutputStream = new FileOutputStream(TEMP_FILE_NAME, true); var gzip = new GZIPOutputStream(byteOutputStream)) { for (var result : searchResults) { - var col = ("\n" + result.get(ODS_ID).asText() + "," + result.get(PHYSICAL_ID).asText()) + var col = ("\n" + result.get(ID_FIELD).asText() + "," + result.get(PHYSICAL_ID_FIELD).asText()) .getBytes(StandardCharsets.UTF_8); gzip.write(col, 0, col.length); } } } + + protected List targetFields(){ + return List.of(ID_FIELD, PHYSICAL_ID_FIELD); + } + + } diff --git a/src/main/resources/tmp.csv.gz b/src/main/resources/tmp.csv.gz index e69de29..9056a7e 100644 Binary files a/src/main/resources/tmp.csv.gz and b/src/main/resources/tmp.csv.gz differ diff --git a/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java b/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java index 43ffe67..a89a748 100644 --- a/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java +++ b/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java @@ -2,10 +2,13 @@ import static eu.dissco.exportjob.utils.TestUtils.DOI_2; import static eu.dissco.exportjob.utils.TestUtils.MAPPER; +import static eu.dissco.exportjob.utils.TestUtils.ORG_1; import static eu.dissco.exportjob.utils.TestUtils.ORG_2; import static eu.dissco.exportjob.utils.TestUtils.PHYS_ID_2; import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen; +import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimenReducedDoiList; import static eu.dissco.exportjob.utils.TestUtils.givenSearchParams; +import static eu.dissco.exportjob.utils.TestUtils.givenTargetFields; import static org.assertj.core.api.Assertions.assertThat; import co.elastic.clients.elasticsearch.ElasticsearchClient; @@ -99,30 +102,31 @@ void clearIndex() throws IOException { } @Test - void testGetTotalHits() throws IOException { + void testGetTargetObject() throws IOException { // Given postDigitalSpecimens( List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2))); // When - var result = elasticRepository.getTotalHits(givenSearchParams(), TargetType.DIGITAL_SPECIMEN); + var result = elasticRepository.getTargetObjects(givenSearchParams(), + TargetType.DIGITAL_SPECIMEN, null, givenTargetFields()); // Then - assertThat(result).isEqualTo(1L); + assertThat(result).isEqualTo(List.of(givenDigitalSpecimenReducedDoiList())); } @Test - void testGetTargetObject() throws IOException { + void testGetTargetObjectSecondPage() throws IOException { // Given postDigitalSpecimens( - List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2))); + List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_1, PHYS_ID_2))); // When var result = elasticRepository.getTargetObjects(givenSearchParams(), - TargetType.DIGITAL_SPECIMEN, 1); + TargetType.DIGITAL_SPECIMEN, DOI_2, givenTargetFields()); // Then - assertThat(result).isEqualTo(List.of(givenDigitalSpecimen())); + assertThat(result).isEqualTo(List.of(givenDigitalSpecimenReducedDoiList())); } @Test @@ -135,7 +139,7 @@ void testGetTargetObjectEmptyOrg() throws IOException { var searchParam = List.of(new SearchParam("$['ods:organisationID']", null)); // When - var result = elasticRepository.getTargetObjects(searchParam, TargetType.DIGITAL_SPECIMEN, 1); + var result = elasticRepository.getTargetObjects(searchParam, TargetType.DIGITAL_SPECIMEN, null, null); // Then assertThat(result).isEqualTo(List.of(expected)); diff --git a/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java b/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java index c196d0c..fb609d1 100644 --- a/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java +++ b/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java @@ -6,7 +6,6 @@ import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen; import static eu.dissco.exportjob.utils.TestUtils.givenJobRequest; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; @@ -43,8 +42,8 @@ void init() { @Test void testHandleMessageNoResultsFound() throws Exception { // Given - given(elasticSearchRepository.getTotalHits(any(), any())).willReturn(0L); - + given(elasticSearchRepository.getTargetObjects(any(), any(), eq(null), any())).willReturn( + List.of()); // When service.handleMessage(givenJobRequest()); @@ -57,8 +56,7 @@ void testHandleMessageNoResultsFound() throws Exception { @Test void testHandleMessage() throws Exception { // GIven - given(elasticSearchRepository.getTotalHits(any(), any())).willReturn(1L); - given(elasticSearchRepository.getTargetObjects(any(), any(), anyInt())).willReturn( + given(elasticSearchRepository.getTargetObjects(any(), any(), eq(null), any())).willReturn( List.of(givenDigitalSpecimen())); given(s3Repository.uploadResults(any(), eq(JOB_ID))).willReturn(DOWNLOAD_LINK); diff --git a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java index 2ad6838..c787931 100644 --- a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java +++ b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java @@ -25,6 +25,8 @@ private TestUtils() { .setSerializationInclusion(Include.NON_NULL); public static final String DOWNLOAD_LINK = "https://aws.download/s3"; public static final String ORG_FIELD_NAME = "$['ods:organisationID']"; + public static final String ID_FIELD = "dcterms:identifier"; + public static final String PHYS_ID_FIELD = "ods:physicalSpecimenID"; public static JobRequest givenJobRequest() { return new JobRequest( @@ -45,9 +47,23 @@ public static JsonNode givenDigitalSpecimen(){ public static JsonNode givenDigitalSpecimen(String doi, String org, String physId){ return MAPPER.createObjectNode() - .put("ods:ID", doi) + .put(ID_FIELD, doi) .put("@id", doi) .put("ods:organisationID", org) - .put("ods:physicalSpecimenID", physId); + .put(PHYS_ID_FIELD, physId); + } + + public static JsonNode givenDigitalSpecimenReducedDoiList(){ + return givenDigitalSpecimenReducedDoiList(DOI_1, PHYS_ID_1); + } + + public static JsonNode givenDigitalSpecimenReducedDoiList(String doi, String physId){ + return MAPPER.createObjectNode() + .put(ID_FIELD, doi) + .put(PHYS_ID_FIELD, physId); + } + + public static List givenTargetFields(){ + return List.of(ID_FIELD, PHYS_ID_FIELD); } }