From c80373929ec855d1f690491c6928b195ef06ea25 Mon Sep 17 00:00:00 2001 From: southeo Date: Thu, 17 Oct 2024 12:59:21 +0200 Subject: [PATCH 01/12] init --- .../exportjob/DisscoExportJobApplication.java | 15 +++ .../ApplicationConfiguration.java | 19 ++++ .../ElasticSearchConfiguration.java | 40 +++++++ .../configuration/WebClientConfiguration.java | 31 ++++++ .../dissco/exportjob/domain/JobRequest.java | 13 +++ .../eu/dissco/exportjob/domain/JobType.java | 7 ++ .../dissco/exportjob/domain/SearchParam.java | 8 ++ .../dissco/exportjob/domain/TargetType.java | 23 ++++ .../exceptions/ElasticSearchException.java | 8 ++ .../properties/ElasticSearchProperties.java | 32 ++++++ .../repository/ElasticSearchRepository.java | 100 ++++++++++++++++++ .../exportjob/service/ExportJobService.java | 98 +++++++++++++++++ .../exportjob/web/ExporterBackendClient.java | 48 +++++++++ .../DisscoExportJobApplicationTests.java | 13 --- .../service/ExportJobServiceTest.java | 26 +++++ .../eu/dissco/exportjob/utils/TestUtils.java | 36 +++++++ 16 files changed, 504 insertions(+), 13 deletions(-) create mode 100644 src/main/java/eu/dissco/exportjob/DisscoExportJobApplication.java create mode 100644 src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java create mode 100644 src/main/java/eu/dissco/exportjob/configuration/ElasticSearchConfiguration.java create mode 100644 src/main/java/eu/dissco/exportjob/configuration/WebClientConfiguration.java create mode 100644 src/main/java/eu/dissco/exportjob/domain/JobRequest.java create mode 100644 src/main/java/eu/dissco/exportjob/domain/JobType.java create mode 100644 src/main/java/eu/dissco/exportjob/domain/SearchParam.java create mode 100644 src/main/java/eu/dissco/exportjob/domain/TargetType.java create mode 100644 src/main/java/eu/dissco/exportjob/exceptions/ElasticSearchException.java create mode 100644 src/main/java/eu/dissco/exportjob/properties/ElasticSearchProperties.java create mode 100644 src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java create mode 100644 src/main/java/eu/dissco/exportjob/service/ExportJobService.java create mode 100644 src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java delete mode 100644 src/test/java/eu/dissco/exportjob/DisscoExportJobApplicationTests.java create mode 100644 src/test/java/eu/dissco/exportjob/service/ExportJobServiceTest.java create mode 100644 src/test/java/eu/dissco/exportjob/utils/TestUtils.java diff --git a/src/main/java/eu/dissco/exportjob/DisscoExportJobApplication.java b/src/main/java/eu/dissco/exportjob/DisscoExportJobApplication.java new file mode 100644 index 0000000..bb2ebb7 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/DisscoExportJobApplication.java @@ -0,0 +1,15 @@ +package eu.dissco.exportjob; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; + +@SpringBootApplication +@ConfigurationPropertiesScan +public class DisscoExportJobApplication { + + public static void main(String[] args) { + SpringApplication.run(DisscoExportJobApplication.class, args); + } + +} diff --git a/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java b/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java new file mode 100644 index 0000000..93cfbd5 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java @@ -0,0 +1,19 @@ +package eu.dissco.exportjob.configuration; + +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@RequiredArgsConstructor +public class ApplicationConfiguration { + + @Bean + public ObjectMapper objectMapper() { + var mapper = new ObjectMapper().findAndRegisterModules(); + mapper.setSerializationInclusion(Include.NON_NULL); + return mapper; + } +} diff --git a/src/main/java/eu/dissco/exportjob/configuration/ElasticSearchConfiguration.java b/src/main/java/eu/dissco/exportjob/configuration/ElasticSearchConfiguration.java new file mode 100644 index 0000000..da23691 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/configuration/ElasticSearchConfiguration.java @@ -0,0 +1,40 @@ +package eu.dissco.exportjob.configuration; + +import eu.dissco.exportjob.properties.ElasticSearchProperties; + + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@RequiredArgsConstructor +public class ElasticSearchConfiguration { + private final ElasticSearchProperties properties; + private final ObjectMapper mapper; + + @Bean + public ElasticsearchClient elasticsearchClient() { + var credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword())); + RestClient restClient = RestClient.builder(new HttpHost(properties.getHostname(), + properties.getPort())) + .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider)).build(); + ElasticsearchTransport transport = new RestClientTransport(restClient, + new JacksonJsonpMapper(mapper)); + return new ElasticsearchClient(transport); + } + +} diff --git a/src/main/java/eu/dissco/exportjob/configuration/WebClientConfiguration.java b/src/main/java/eu/dissco/exportjob/configuration/WebClientConfiguration.java new file mode 100644 index 0000000..7827fe1 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/configuration/WebClientConfiguration.java @@ -0,0 +1,31 @@ +package eu.dissco.exportjob.configuration; + +import jakarta.validation.constraints.NotBlank; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + + +@Configuration +@RequiredArgsConstructor +public class WebClientConfiguration { + + @NotBlank + @Value("${backend.endpoint}") + private String backendEndpoint; + + @Bean(name="exporter") + public WebClient webClient(){ + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) + .baseUrl(backendEndpoint) + .build(); + } + + + +} diff --git a/src/main/java/eu/dissco/exportjob/domain/JobRequest.java b/src/main/java/eu/dissco/exportjob/domain/JobRequest.java new file mode 100644 index 0000000..cc0002c --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/domain/JobRequest.java @@ -0,0 +1,13 @@ +package eu.dissco.exportjob.domain; + +import java.util.List; +import java.util.UUID; + +public record JobRequest( + JobType jobType, + List searchParams, + TargetType targetType, + UUID jobId +) { + +} diff --git a/src/main/java/eu/dissco/exportjob/domain/JobType.java b/src/main/java/eu/dissco/exportjob/domain/JobType.java new file mode 100644 index 0000000..d1ec757 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/domain/JobType.java @@ -0,0 +1,7 @@ +package eu.dissco.exportjob.domain; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum JobType { + @JsonProperty("doi_list") DOI_LIST +} diff --git a/src/main/java/eu/dissco/exportjob/domain/SearchParam.java b/src/main/java/eu/dissco/exportjob/domain/SearchParam.java new file mode 100644 index 0000000..27f1f0c --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/domain/SearchParam.java @@ -0,0 +1,8 @@ +package eu.dissco.exportjob.domain; + +public record SearchParam( + String inputField, + String inputValue +) { + +} diff --git a/src/main/java/eu/dissco/exportjob/domain/TargetType.java b/src/main/java/eu/dissco/exportjob/domain/TargetType.java new file mode 100644 index 0000000..fd72ca7 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/domain/TargetType.java @@ -0,0 +1,23 @@ +package eu.dissco.exportjob.domain; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; + +public enum TargetType { + + @JsonProperty("https://doi.org/21.T11148/894b1e6cad57e921764e") + DIGITAL_SPECIMEN( + "https://doi.org/21.T11148/894b1e6cad57e921764e"), + + @JsonProperty("https://doi.org/21.T11148/bbad8c4e101e8af01115") + MEDIA_OBJECT( + "https://doi.org/21.T11148/bbad8c4e101e8af01115"); + + @Getter + private final String name; + + TargetType(String name) { + this.name = name; + } + +} diff --git a/src/main/java/eu/dissco/exportjob/exceptions/ElasticSearchException.java b/src/main/java/eu/dissco/exportjob/exceptions/ElasticSearchException.java new file mode 100644 index 0000000..78647c6 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/exceptions/ElasticSearchException.java @@ -0,0 +1,8 @@ +package eu.dissco.exportjob.exceptions; + +public class ElasticSearchException extends Exception { + public ElasticSearchException(){ + super(); + } + +} diff --git a/src/main/java/eu/dissco/exportjob/properties/ElasticSearchProperties.java b/src/main/java/eu/dissco/exportjob/properties/ElasticSearchProperties.java new file mode 100644 index 0000000..42c8304 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/properties/ElasticSearchProperties.java @@ -0,0 +1,32 @@ +package eu.dissco.exportjob.properties; + + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Positive; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Data +@Validated +@ConfigurationProperties("elasticsearch") +public class ElasticSearchProperties { + @NotBlank + private String hostname; + @Positive + private int port; + @NotBlank + private String username; + @NotBlank + private String password; + + @NotBlank + private String digitalSpecimenIndex = "digital-specimen"; + + @NotBlank + private String digitalMediaObjectIndex = "digital-media"; + + @NotNull + private int pageSize = 300; +} diff --git a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java new file mode 100644 index 0000000..9076f5c --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java @@ -0,0 +1,100 @@ +package eu.dissco.exportjob.repository; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.query_dsl.Query; +import co.elastic.clients.elasticsearch.core.CountRequest; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.search.Hit; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import eu.dissco.exportjob.domain.SearchParam; +import eu.dissco.exportjob.domain.TargetType; +import eu.dissco.exportjob.properties.ElasticSearchProperties; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Repository; + +@Slf4j +@Repository +@RequiredArgsConstructor +public class ElasticSearchRepository { + private final ElasticsearchClient client; + private final ElasticSearchProperties properties; + + public Long getTotalHits(List searchParams, TargetType targetType) + throws IOException { + var query = generateQuery(searchParams); + var index = getIndex(targetType); + var countRequest = new CountRequest.Builder() + .index(index) + .query( + q -> q.bool(b -> b.must(query))) + .build(); + var count = client.count(countRequest); + return count.count(); + } + + public List getTargetObjects(List searchParams, TargetType targetType, + int pageNumber) + throws IOException { + var query = generateQuery(searchParams); + var index = getIndex(targetType); + + var searchRequest = new SearchRequest.Builder() + .index(index) + .query( + q -> q.bool(b -> b.must(query))) + .trackTotalHits(t -> t.enabled(Boolean.TRUE)) + .from(getOffset(pageNumber, properties.getPageSize())) + .size(properties.getPageSize()) + .build(); + var searchResult = client.search(searchRequest, ObjectNode.class); + + return searchResult.hits().hits().stream() + .map(Hit::source) + .filter(Objects::nonNull) + .map(JsonNode.class::cast) + .toList(); + } + + private String getIndex(TargetType targetType) { + return targetType == TargetType.DIGITAL_SPECIMEN ? properties.getDigitalSpecimenIndex() + : properties.getDigitalMediaObjectIndex(); + } + + + private static List generateQuery(List searchParams) { + var qList = new ArrayList(); + for (var searchParam : searchParams) { + var key = searchParam.inputField() + .replace("'", "") + .replace("[*]", "") + .replace("$", "") + .replace("[", "") + .replace("]", ".") + + "keyword"; + var val = searchParam.inputValue(); + if (val.isEmpty()) { + qList.add( + new Query.Builder().bool(b -> b.mustNot(q -> q.exists(e -> e.field(key)))).build()); + } else { + qList.add( + new Query.Builder().term(t -> t.field(key).value(val).caseInsensitive(true)).build()); + } + } + return qList; + } + + private static int getOffset(int pageNumber, int pageSize) { + int offset = 0; + if (pageNumber > 1) { + offset = offset + (pageSize * (pageNumber - 1)); + } + return offset; + } + +} diff --git a/src/main/java/eu/dissco/exportjob/service/ExportJobService.java b/src/main/java/eu/dissco/exportjob/service/ExportJobService.java new file mode 100644 index 0000000..32b6d00 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/service/ExportJobService.java @@ -0,0 +1,98 @@ +package eu.dissco.exportjob.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.opencsv.CSVWriter; +import com.fasterxml.jackson.databind.JsonNode; +import eu.dissco.exportjob.domain.JobRequest; +import eu.dissco.exportjob.domain.TargetType; +import eu.dissco.exportjob.repository.ElasticSearchRepository; +import eu.dissco.exportjob.web.ExporterBackendClient; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +@RequiredArgsConstructor +public class ExportJobService { + + private final ElasticSearchRepository elasticSearchRepository; + private final ExporterBackendClient exporterBackendClient; + private static final String ODS_ID = "ods:ID"; + private static final String PHYSICAL_ID = "ods:physicalSpecimenID"; + + public void handleMessage(JobRequest jobRequest) { + exporterBackendClient.updateJobState(jobRequest.jobId(), "/running"); + try { + var searchResultFile = processSarchResults(jobRequest); + } catch (IOException e) { + log.error("An error has occurred", e); + exporterBackendClient.updateJobState(jobRequest.jobId(), "/failed"); + } + } + + + private File processSarchResults(JobRequest jobRequest) throws IOException { + var totalHits = elasticSearchRepository.getTotalHits(jobRequest.searchParams(), jobRequest.targetType()); + int pageNum = 1; + var hitsProcessed = 0; + var file = writeHeadersToFile(jobRequest); + while (hitsProcessed < totalHits) { + var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), jobRequest.targetType(), pageNum); + writeResultsToFile(searchResult, jobRequest, file); + hitsProcessed = hitsProcessed + searchResult.size(); + pageNum = pageNum + 1; + } + return file; + } + + private File writeHeadersToFile(JobRequest jobRequest) throws IOException { + var file = new File("src/main/resources/tmp.csv"); + try ( + var fileWriter = new FileWriter(file); + var csvWriter = new CSVWriter(fileWriter)) { + var header = getHeadersFromJobType(jobRequest); + csvWriter.writeNext(header); + } + return file; + } + + private void writeResultsToFile(List searchResult, JobRequest jobRequest, File file) + throws IOException { + try ( + var fileWriter = new FileWriter(file, true); + var csvWriter = new CSVWriter(fileWriter)) { + switch (jobRequest.jobType()) { + case DOI_LIST -> writeDoiList(searchResult, csvWriter); + } + } + } + + private void writeDoiList(List searchResults, CSVWriter csvWriter) { + searchResults.forEach( + searchResult -> csvWriter.writeNext( + new String[]{searchResult.get(ODS_ID).asText(), + searchResult.get(PHYSICAL_ID).asText()}) + ); + } + + private static String[] getHeadersFromJobType(JobRequest jobRequest) { + switch (jobRequest.jobType()) { + case DOI_LIST -> { + return new String[]{ODS_ID, PHYSICAL_ID}; + } + default -> { + log.error("Unknown job type {}", jobRequest.jobType()); + throw new UnsupportedOperationException(); + } + } + } + + +} diff --git a/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java b/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java new file mode 100644 index 0000000..2970c0e --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java @@ -0,0 +1,48 @@ +package eu.dissco.exportjob.web; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpMethod; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; + +@Component +@RequiredArgsConstructor +@Slf4j +public class ExporterBackendClient { + + @Qualifier("exporter") + private final WebClient webClient; + + public void updateJobState(UUID jobId, String path) { + try { + webClient + .method(HttpMethod.POST) + .uri(uriBuilder -> uriBuilder.path("/" + jobId.toString() + path).build()) + .retrieve() + .toBodilessEntity().toFuture().get(); + } catch (ExecutionException | InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Unable to notify exporter backend that job {} is {}", jobId, + path.replace("/", "")); + } + } + + // todo -> define body in backend + public void markJobAsComplete(UUID jobId) { + try { + webClient + .method(HttpMethod.POST) + .uri(uriBuilder -> uriBuilder.path("/complete").build()) + .retrieve() + .toBodilessEntity().toFuture().get(); + } catch (ExecutionException | InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Unable to notify exporter backend that job {} is running", jobId); + } + } + +} diff --git a/src/test/java/eu/dissco/exportjob/DisscoExportJobApplicationTests.java b/src/test/java/eu/dissco/exportjob/DisscoExportJobApplicationTests.java deleted file mode 100644 index 73dce31..0000000 --- a/src/test/java/eu/dissco/exportjob/DisscoExportJobApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package eu.dissco.exportjob; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class DisscoExportJobApplicationTests { - - @Test - void contextLoads() { - } - -} diff --git a/src/test/java/eu/dissco/exportjob/service/ExportJobServiceTest.java b/src/test/java/eu/dissco/exportjob/service/ExportJobServiceTest.java new file mode 100644 index 0000000..3cd9634 --- /dev/null +++ b/src/test/java/eu/dissco/exportjob/service/ExportJobServiceTest.java @@ -0,0 +1,26 @@ +package eu.dissco.exportjob.service; + +import eu.dissco.exportjob.repository.ElasticSearchRepository; +import eu.dissco.exportjob.web.ExporterBackendClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ExportJobServiceTest { + private ExportJobService service; + + @Mock + private ElasticSearchRepository elasticSearchRepository; + @Mock + private ExporterBackendClient exporterBackendClient; + + @BeforeEach + void init(){ + service = new ExportJobService(elasticSearchRepository, exporterBackendClient); + } + + +} diff --git a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java new file mode 100644 index 0000000..1c37d95 --- /dev/null +++ b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java @@ -0,0 +1,36 @@ +package eu.dissco.exportjob.utils; + +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dissco.exportjob.domain.JobRequest; +import eu.dissco.exportjob.domain.JobType; +import eu.dissco.exportjob.domain.SearchParam; +import eu.dissco.exportjob.domain.TargetType; +import java.util.List; +import java.util.UUID; + +public class TestUtils { + + private TestUtils() { + } + + public static final UUID JOB_ID = UUID.fromString("cd5c9ee7-23b1-4615-993e-9d56d0720213"); + public static final ObjectMapper MAPPER = new ObjectMapper().findAndRegisterModules() + .setSerializationInclusion(Include.NON_NULL); + + private static JobRequest givenJobRequest() { + return new JobRequest( + JobType.DOI_LIST, + givenSearchParams(), + TargetType.DIGITAL_SPECIMEN, + UUID.randomUUID() + ); + } + + private static List givenSearchParams() { + return List.of(new SearchParam( + "ods:organisationID", "https://ror.org/03wkt5x30")); + } + + +} From e81d034362fdfe95edc5b9e3e4e5e51a90f53735 Mon Sep 17 00:00:00 2001 From: southeo Date: Thu, 17 Oct 2024 14:54:35 +0200 Subject: [PATCH 02/12] make service abstract --- .../java/eu/dissco/exportjob/Profiles.java | 7 ++ .../repository/ElasticSearchRepository.java | 10 +- .../service/AbstractExportJobService.java | 51 ++++++++++ .../exportjob/service/DoiListService.java | 49 ++++++++++ .../exportjob/service/ExportJobService.java | 98 ------------------- ...java => AbstractExportJobServiceTest.java} | 7 +- 6 files changed, 118 insertions(+), 104 deletions(-) create mode 100644 src/main/java/eu/dissco/exportjob/Profiles.java create mode 100644 src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java create mode 100644 src/main/java/eu/dissco/exportjob/service/DoiListService.java delete mode 100644 src/main/java/eu/dissco/exportjob/service/ExportJobService.java rename src/test/java/eu/dissco/exportjob/service/{ExportJobServiceTest.java => AbstractExportJobServiceTest.java} (74%) diff --git a/src/main/java/eu/dissco/exportjob/Profiles.java b/src/main/java/eu/dissco/exportjob/Profiles.java new file mode 100644 index 0000000..3a87e1d --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/Profiles.java @@ -0,0 +1,7 @@ +package eu.dissco.exportjob; + +public class Profiles { + public static final String DOI_LIST = "doi_list"; + + private Profiles(){} +} diff --git a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java index 9076f5c..a132a1a 100644 --- a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java +++ b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java @@ -14,6 +14,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.ExecutionException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Repository; @@ -34,8 +35,13 @@ public Long getTotalHits(List searchParams, TargetType targetType) .query( q -> q.bool(b -> b.must(query))) .build(); - var count = client.count(countRequest); - return count.count(); + try { + var count = client.count(countRequest); + return count.count(); + } catch (Exception e){ + log.error("error", e); + throw e; + } } public List getTargetObjects(List searchParams, TargetType targetType, diff --git a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java new file mode 100644 index 0000000..6412261 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java @@ -0,0 +1,51 @@ +package eu.dissco.exportjob.service; + +import com.fasterxml.jackson.databind.JsonNode; +import eu.dissco.exportjob.domain.JobRequest; +import eu.dissco.exportjob.repository.ElasticSearchRepository; +import eu.dissco.exportjob.web.ExporterBackendClient; +import java.io.IOException; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +@RequiredArgsConstructor +public abstract class AbstractExportJobService { + + private final ElasticSearchRepository elasticSearchRepository; + private final ExporterBackendClient exporterBackendClient; + protected static final String TEMP_FILE_NAME = "src/main/resources/tmp.csv.gz"; + + public void handleMessage(JobRequest jobRequest) { + exporterBackendClient.updateJobState(jobRequest.jobId(), "/running"); + try { + processSearchResults(jobRequest); + } catch (IOException e) { + log.error("An error has occurred", e); + exporterBackendClient.updateJobState(jobRequest.jobId(), "/failed"); + } + } + + private void processSearchResults(JobRequest jobRequest) throws IOException { + var totalHits = elasticSearchRepository.getTotalHits(jobRequest.searchParams(), + jobRequest.targetType()); + int pageNum = 1; + var hitsProcessed = 0; + writeHeaderToFile(); + while (hitsProcessed < totalHits) { + var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), + jobRequest.targetType(), pageNum); + writeResultsToFile(searchResult); + hitsProcessed = hitsProcessed + searchResult.size(); + pageNum = pageNum + 1; + } + } + + protected abstract void writeHeaderToFile() throws IOException; + + protected abstract void writeResultsToFile(List searchResult) throws IOException; + +} diff --git a/src/main/java/eu/dissco/exportjob/service/DoiListService.java b/src/main/java/eu/dissco/exportjob/service/DoiListService.java new file mode 100644 index 0000000..40a44f8 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/service/DoiListService.java @@ -0,0 +1,49 @@ +package eu.dissco.exportjob.service; + +import com.fasterxml.jackson.databind.JsonNode; +import eu.dissco.exportjob.Profiles; +import eu.dissco.exportjob.repository.ElasticSearchRepository; +import eu.dissco.exportjob.web.ExporterBackendClient; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.zip.GZIPOutputStream; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Service; + +@Service +@Profile(Profiles.DOI_LIST) +public class DoiListService extends AbstractExportJobService { + + private static final String ODS_ID = "ods:ID"; + private static final String PHYSICAL_ID = "ods:physicalSpecimenID"; + private static final byte[] HEADER = (ODS_ID + "," + PHYSICAL_ID).getBytes( + StandardCharsets.UTF_8); + + public DoiListService( + ElasticSearchRepository elasticSearchRepository, + ExporterBackendClient exporterBackendClient) { + super(elasticSearchRepository, exporterBackendClient); + } + + protected void writeHeaderToFile() throws IOException { + try ( + var byteOutputStream = new FileOutputStream(TEMP_FILE_NAME, true); + var gzip = new GZIPOutputStream(byteOutputStream)) { + gzip.write(HEADER, 0, HEADER.length); + } + } + + protected void writeResultsToFile(List searchResults) throws IOException { + try ( + var byteOutputStream = new FileOutputStream(TEMP_FILE_NAME, true); + var gzip = new GZIPOutputStream(byteOutputStream)) { + for (var result : searchResults) { + var col = ("\n" + result.get(ODS_ID).asText() + "," + result.get(PHYSICAL_ID).asText()) + .getBytes(StandardCharsets.UTF_8); + gzip.write(col, 0, col.length); + } + } + } +} diff --git a/src/main/java/eu/dissco/exportjob/service/ExportJobService.java b/src/main/java/eu/dissco/exportjob/service/ExportJobService.java deleted file mode 100644 index 32b6d00..0000000 --- a/src/main/java/eu/dissco/exportjob/service/ExportJobService.java +++ /dev/null @@ -1,98 +0,0 @@ -package eu.dissco.exportjob.service; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.opencsv.CSVWriter; -import com.fasterxml.jackson.databind.JsonNode; -import eu.dissco.exportjob.domain.JobRequest; -import eu.dissco.exportjob.domain.TargetType; -import eu.dissco.exportjob.repository.ElasticSearchRepository; -import eu.dissco.exportjob.web.ExporterBackendClient; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -@Service -@Slf4j -@RequiredArgsConstructor -public class ExportJobService { - - private final ElasticSearchRepository elasticSearchRepository; - private final ExporterBackendClient exporterBackendClient; - private static final String ODS_ID = "ods:ID"; - private static final String PHYSICAL_ID = "ods:physicalSpecimenID"; - - public void handleMessage(JobRequest jobRequest) { - exporterBackendClient.updateJobState(jobRequest.jobId(), "/running"); - try { - var searchResultFile = processSarchResults(jobRequest); - } catch (IOException e) { - log.error("An error has occurred", e); - exporterBackendClient.updateJobState(jobRequest.jobId(), "/failed"); - } - } - - - private File processSarchResults(JobRequest jobRequest) throws IOException { - var totalHits = elasticSearchRepository.getTotalHits(jobRequest.searchParams(), jobRequest.targetType()); - int pageNum = 1; - var hitsProcessed = 0; - var file = writeHeadersToFile(jobRequest); - while (hitsProcessed < totalHits) { - var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), jobRequest.targetType(), pageNum); - writeResultsToFile(searchResult, jobRequest, file); - hitsProcessed = hitsProcessed + searchResult.size(); - pageNum = pageNum + 1; - } - return file; - } - - private File writeHeadersToFile(JobRequest jobRequest) throws IOException { - var file = new File("src/main/resources/tmp.csv"); - try ( - var fileWriter = new FileWriter(file); - var csvWriter = new CSVWriter(fileWriter)) { - var header = getHeadersFromJobType(jobRequest); - csvWriter.writeNext(header); - } - return file; - } - - private void writeResultsToFile(List searchResult, JobRequest jobRequest, File file) - throws IOException { - try ( - var fileWriter = new FileWriter(file, true); - var csvWriter = new CSVWriter(fileWriter)) { - switch (jobRequest.jobType()) { - case DOI_LIST -> writeDoiList(searchResult, csvWriter); - } - } - } - - private void writeDoiList(List searchResults, CSVWriter csvWriter) { - searchResults.forEach( - searchResult -> csvWriter.writeNext( - new String[]{searchResult.get(ODS_ID).asText(), - searchResult.get(PHYSICAL_ID).asText()}) - ); - } - - private static String[] getHeadersFromJobType(JobRequest jobRequest) { - switch (jobRequest.jobType()) { - case DOI_LIST -> { - return new String[]{ODS_ID, PHYSICAL_ID}; - } - default -> { - log.error("Unknown job type {}", jobRequest.jobType()); - throw new UnsupportedOperationException(); - } - } - } - - -} diff --git a/src/test/java/eu/dissco/exportjob/service/ExportJobServiceTest.java b/src/test/java/eu/dissco/exportjob/service/AbstractExportJobServiceTest.java similarity index 74% rename from src/test/java/eu/dissco/exportjob/service/ExportJobServiceTest.java rename to src/test/java/eu/dissco/exportjob/service/AbstractExportJobServiceTest.java index 3cd9634..f99ff26 100644 --- a/src/test/java/eu/dissco/exportjob/service/ExportJobServiceTest.java +++ b/src/test/java/eu/dissco/exportjob/service/AbstractExportJobServiceTest.java @@ -3,14 +3,13 @@ import eu.dissco.exportjob.repository.ElasticSearchRepository; import eu.dissco.exportjob.web.ExporterBackendClient; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class ExportJobServiceTest { - private ExportJobService service; +class AbstractExportJobServiceTest { + private AbstractExportJobService service; @Mock private ElasticSearchRepository elasticSearchRepository; @@ -19,7 +18,7 @@ class ExportJobServiceTest { @BeforeEach void init(){ - service = new ExportJobService(elasticSearchRepository, exporterBackendClient); + //service = new AbstractExportJobService(elasticSearchRepository, exporterBackendClient); } From f15a9949e668d06a4c95ffb9a6939bc85dbb11cc Mon Sep 17 00:00:00 2001 From: southeo Date: Mon, 21 Oct 2024 10:47:00 +0200 Subject: [PATCH 03/12] testing --- pom.xml | 24 ++- .../eu/dissco/exportjob/ProjectRunner.java | 25 +++ .../component/JobRequestComponent.java | 50 ++++++ .../configuration/S3Configuration.java | 33 ++++ .../configuration/WebClientConfiguration.java | 19 ++- .../dissco/exportjob/domain/JobRequest.java | 1 - .../exceptions/ElasticSearchException.java | 8 - .../exceptions/FailedProcessingException.java | 12 ++ .../exportjob/properties/S3Properties.java | 18 ++ .../exportjob/properties/TokenProperties.java | 35 ++++ .../repository/ElasticSearchRepository.java | 3 +- .../exportjob/repository/S3Repository.java | 35 ++++ .../service/AbstractExportJobService.java | 11 +- .../exportjob/service/DoiListService.java | 2 +- .../exportjob/web/ExporterBackendClient.java | 24 ++- .../exportjob/web/TokenAuthenticator.java | 63 +++++++ .../repository/ElasticSearchRepositoryIT.java | 154 ++++++++++++++++++ .../eu/dissco/exportjob/utils/TestUtils.java | 23 ++- 18 files changed, 511 insertions(+), 29 deletions(-) create mode 100644 src/main/java/eu/dissco/exportjob/ProjectRunner.java create mode 100644 src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java create mode 100644 src/main/java/eu/dissco/exportjob/configuration/S3Configuration.java delete mode 100644 src/main/java/eu/dissco/exportjob/exceptions/ElasticSearchException.java create mode 100644 src/main/java/eu/dissco/exportjob/exceptions/FailedProcessingException.java create mode 100644 src/main/java/eu/dissco/exportjob/properties/S3Properties.java create mode 100644 src/main/java/eu/dissco/exportjob/properties/TokenProperties.java create mode 100644 src/main/java/eu/dissco/exportjob/repository/S3Repository.java create mode 100644 src/main/java/eu/dissco/exportjob/web/TokenAuthenticator.java create mode 100644 src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryIT.java diff --git a/pom.xml b/pom.xml index 9a776c4..9645578 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,7 @@ 8.15.0 2.1.3 5.9 + 2.25.60 @@ -64,9 +65,21 @@ opencsv ${opencsv.version} - - - + + software.amazon.awssdk + s3 + ${amazon.awssdk.version} + + + software.amazon.awssdk + s3-transfer-manager + ${amazon.awssdk.version} + + + software.amazon.awssdk + aws-crt-client + ${amazon.awssdk.version} + org.testcontainers elasticsearch @@ -77,6 +90,11 @@ spring-boot-starter-test test + + org.testcontainers + junit-jupiter + test + diff --git a/src/main/java/eu/dissco/exportjob/ProjectRunner.java b/src/main/java/eu/dissco/exportjob/ProjectRunner.java new file mode 100644 index 0000000..08c6c8e --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/ProjectRunner.java @@ -0,0 +1,25 @@ +package eu.dissco.exportjob; + +import eu.dissco.exportjob.component.JobRequestComponent; +import eu.dissco.exportjob.exceptions.FailedProcessingException; +import eu.dissco.exportjob.service.AbstractExportJobService; +import lombok.AllArgsConstructor; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.stereotype.Component; + +@Component +@AllArgsConstructor +public class ProjectRunner implements CommandLineRunner { + + private final AbstractExportJobService exportJobService; + private final ConfigurableApplicationContext context; + private final JobRequestComponent jobRequestComponent; + + @Override + public void run(String... args) throws FailedProcessingException { + var request = jobRequestComponent.getJobRequest(); + exportJobService.handleMessage(request); + context.close(); + } +} diff --git a/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java b/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java new file mode 100644 index 0000000..d06878e --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java @@ -0,0 +1,50 @@ +package eu.dissco.exportjob.component; + +import eu.dissco.exportjob.domain.JobRequest; +import eu.dissco.exportjob.domain.SearchParam; +import eu.dissco.exportjob.domain.TargetType; +import jakarta.validation.constraints.NotBlank; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class JobRequestComponent { + + @NotBlank + @Value("#{'${job.input-fields}'.split(',')}") + List inputFields; + + @NotBlank + @Value("#{'${job.input-values}'.split(',')}") + List inputValues; + + @NotBlank + @Value("${job.target-type}") + TargetType targetType; + + @NotBlank + @Value("${job.id}") + UUID jobId; + + public JobRequest getJobRequest() { + var searchParams = new ArrayList(); + if (inputFields.size() != inputValues.size()) { + log.error("Mismatch between input fields and input values for searching"); + throw new IllegalStateException(); + } + for (int i = 0; i < inputFields.size(); i++) { + searchParams.add(new SearchParam(inputFields.get(i), inputValues.get(i))); + } + log.info("Received {} job request with id {} and {} search parameters", targetType, jobId, + searchParams); + return new JobRequest(searchParams, targetType, jobId); + } + +} diff --git a/src/main/java/eu/dissco/exportjob/configuration/S3Configuration.java b/src/main/java/eu/dissco/exportjob/configuration/S3Configuration.java new file mode 100644 index 0000000..5ce9718 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/configuration/S3Configuration.java @@ -0,0 +1,33 @@ +package eu.dissco.exportjob.configuration; + +import eu.dissco.exportjob.properties.S3Properties; +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.transfer.s3.S3TransferManager; + +@Configuration +@RequiredArgsConstructor +public class S3Configuration { + private final S3Properties s3Properties; + + @Bean + public S3AsyncClient s3Client() { + return S3AsyncClient.crtBuilder() + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create(s3Properties.getAccessKey(), + s3Properties.getAccessSecret()))) + .region(Region.EU_WEST_2) + .build(); + } + + @Bean + public S3TransferManager transferManager() { + return S3TransferManager.builder().s3Client(s3Client()).build(); + } + +} diff --git a/src/main/java/eu/dissco/exportjob/configuration/WebClientConfiguration.java b/src/main/java/eu/dissco/exportjob/configuration/WebClientConfiguration.java index 7827fe1..543f5a0 100644 --- a/src/main/java/eu/dissco/exportjob/configuration/WebClientConfiguration.java +++ b/src/main/java/eu/dissco/exportjob/configuration/WebClientConfiguration.java @@ -5,6 +5,8 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.WebClient; import reactor.netty.http.client.HttpClient; @@ -15,17 +17,30 @@ public class WebClientConfiguration { @NotBlank - @Value("${backend.endpoint}") + @Value("${endpoint.backend}") private String backendEndpoint; - @Bean(name="exporter") + @NotBlank + @Value("${endpoint.token}") + private String tokenEndpoint; + + @Bean(name= "exporterClient") public WebClient webClient(){ return WebClient.builder() .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) .baseUrl(backendEndpoint) + .defaultHeader(HttpHeaders.CONTENT_TYPE) .build(); } + @Bean(name = "tokenClient") + public WebClient tokenClient() { + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) + .baseUrl(tokenEndpoint) + .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE) + .build(); + } } diff --git a/src/main/java/eu/dissco/exportjob/domain/JobRequest.java b/src/main/java/eu/dissco/exportjob/domain/JobRequest.java index cc0002c..b52bbfe 100644 --- a/src/main/java/eu/dissco/exportjob/domain/JobRequest.java +++ b/src/main/java/eu/dissco/exportjob/domain/JobRequest.java @@ -4,7 +4,6 @@ import java.util.UUID; public record JobRequest( - JobType jobType, List searchParams, TargetType targetType, UUID jobId diff --git a/src/main/java/eu/dissco/exportjob/exceptions/ElasticSearchException.java b/src/main/java/eu/dissco/exportjob/exceptions/ElasticSearchException.java deleted file mode 100644 index 78647c6..0000000 --- a/src/main/java/eu/dissco/exportjob/exceptions/ElasticSearchException.java +++ /dev/null @@ -1,8 +0,0 @@ -package eu.dissco.exportjob.exceptions; - -public class ElasticSearchException extends Exception { - public ElasticSearchException(){ - super(); - } - -} diff --git a/src/main/java/eu/dissco/exportjob/exceptions/FailedProcessingException.java b/src/main/java/eu/dissco/exportjob/exceptions/FailedProcessingException.java new file mode 100644 index 0000000..1d9d0ca --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/exceptions/FailedProcessingException.java @@ -0,0 +1,12 @@ +package eu.dissco.exportjob.exceptions; + +public class FailedProcessingException extends Exception { + public FailedProcessingException(){ + super(); + } + + public FailedProcessingException(String s){ + super(s); + } + +} diff --git a/src/main/java/eu/dissco/exportjob/properties/S3Properties.java b/src/main/java/eu/dissco/exportjob/properties/S3Properties.java new file mode 100644 index 0000000..c9181c7 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/properties/S3Properties.java @@ -0,0 +1,18 @@ +package eu.dissco.exportjob.properties; + +import jakarta.validation.constraints.NotBlank; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Data +@Validated +@ConfigurationProperties(prefix = "s3") +public class S3Properties { + + @NotBlank + private String accessKey; + + @NotBlank + private String accessSecret; +} diff --git a/src/main/java/eu/dissco/exportjob/properties/TokenProperties.java b/src/main/java/eu/dissco/exportjob/properties/TokenProperties.java new file mode 100644 index 0000000..5c5f8fc --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/properties/TokenProperties.java @@ -0,0 +1,35 @@ +package eu.dissco.exportjob.properties; + +import jakarta.annotation.PostConstruct; +import jakarta.validation.constraints.NotBlank; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.validation.annotation.Validated; + +@Data +@Validated +@ConfigurationProperties("token") +public class TokenProperties { + + @NotBlank + private String secret; + + @NotBlank + private String id; + + @NotBlank + private String grantType; + + private MultiValueMap fromFormData; + + @PostConstruct + private void setProperties() { + fromFormData = new LinkedMultiValueMap<>(); + fromFormData.add("grant_type", grantType); + fromFormData.add("client_id", id); + fromFormData.add("client_secret", secret); + } + +} diff --git a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java index a132a1a..0a3c265 100644 --- a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java +++ b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java @@ -39,7 +39,6 @@ public Long getTotalHits(List searchParams, TargetType targetType) var count = client.count(countRequest); return count.count(); } catch (Exception e){ - log.error("error", e); throw e; } } @@ -84,7 +83,7 @@ private static List generateQuery(List searchParams) { .replace("]", ".") + "keyword"; var val = searchParam.inputValue(); - if (val.isEmpty()) { + if (val == null || val.isEmpty()) { qList.add( new Query.Builder().bool(b -> b.mustNot(q -> q.exists(e -> e.field(key)))).build()); } else { diff --git a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java new file mode 100644 index 0000000..d3d2341 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java @@ -0,0 +1,35 @@ +package eu.dissco.exportjob.repository; + +import java.io.File; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; + +@Repository +@RequiredArgsConstructor +public class S3Repository { + + private S3AsyncClient s3Client; + private S3TransferManager transferManager; + private static final String BUCKET_NAME = "dissco-download"; + + public void upload(File file, UUID jobId) { + var request = PutObjectRequest.builder() + .bucket(BUCKET_NAME) + .key(jobId.toString()) + .build(); + var body = AsyncRequestBody.fromFile(file); + var upload = transferManager.upload(builder -> builder + .requestBody(body) + .putObjectRequest(request) + .build()); + + + } + +} diff --git a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java index 6412261..d8737c8 100644 --- a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java +++ b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java @@ -2,13 +2,18 @@ import com.fasterxml.jackson.databind.JsonNode; import eu.dissco.exportjob.domain.JobRequest; +import eu.dissco.exportjob.exceptions.FailedProcessingException; import eu.dissco.exportjob.repository.ElasticSearchRepository; import eu.dissco.exportjob.web.ExporterBackendClient; +import java.io.File; import java.io.IOException; import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; @Service @Slf4j @@ -18,8 +23,10 @@ public abstract class AbstractExportJobService { private final ElasticSearchRepository elasticSearchRepository; private final ExporterBackendClient exporterBackendClient; protected static final String TEMP_FILE_NAME = "src/main/resources/tmp.csv.gz"; + private static final String BUCKET_NAME = "dissco-download"; - public void handleMessage(JobRequest jobRequest) { + + public void handleMessage(JobRequest jobRequest) throws FailedProcessingException { exporterBackendClient.updateJobState(jobRequest.jobId(), "/running"); try { processSearchResults(jobRequest); @@ -27,6 +34,8 @@ public void handleMessage(JobRequest jobRequest) { log.error("An error has occurred", e); exporterBackendClient.updateJobState(jobRequest.jobId(), "/failed"); } + // todo + //exporterBackendClient.markJobAsComplete(jobRequest.jobId()); } private void processSearchResults(JobRequest jobRequest) throws IOException { diff --git a/src/main/java/eu/dissco/exportjob/service/DoiListService.java b/src/main/java/eu/dissco/exportjob/service/DoiListService.java index 40a44f8..8a0bd4c 100644 --- a/src/main/java/eu/dissco/exportjob/service/DoiListService.java +++ b/src/main/java/eu/dissco/exportjob/service/DoiListService.java @@ -29,7 +29,7 @@ public DoiListService( protected void writeHeaderToFile() throws IOException { try ( - var byteOutputStream = new FileOutputStream(TEMP_FILE_NAME, true); + var byteOutputStream = new FileOutputStream(TEMP_FILE_NAME); var gzip = new GZIPOutputStream(byteOutputStream)) { gzip.write(HEADER, 0, HEADER.length); } diff --git a/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java b/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java index 2970c0e..7b25060 100644 --- a/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java +++ b/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java @@ -1,5 +1,6 @@ package eu.dissco.exportjob.web; +import eu.dissco.exportjob.exceptions.FailedProcessingException; import java.util.UUID; import java.util.concurrent.ExecutionException; import lombok.RequiredArgsConstructor; @@ -14,34 +15,43 @@ @Slf4j public class ExporterBackendClient { - @Qualifier("exporter") + @Qualifier("exporterClient") private final WebClient webClient; + private final TokenAuthenticator tokenAuthenticator; - public void updateJobState(UUID jobId, String path) { + public void updateJobState(UUID jobId, String path) throws FailedProcessingException { try { webClient .method(HttpMethod.POST) .uri(uriBuilder -> uriBuilder.path("/" + jobId.toString() + path).build()) + .header("Authorization", "Bearer " + tokenAuthenticator.getToken()) .retrieve() .toBodilessEntity().toFuture().get(); - } catch (ExecutionException | InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (ExecutionException e) { log.error("Unable to notify exporter backend that job {} is {}", jobId, path.replace("/", "")); + } catch (InterruptedException e) { + log.error("Thread has been interrupted", e); + Thread.currentThread().interrupt(); + throw new FailedProcessingException(); } } // todo -> define body in backend - public void markJobAsComplete(UUID jobId) { + public void markJobAsComplete(UUID jobId) throws FailedProcessingException { try { webClient .method(HttpMethod.POST) .uri(uriBuilder -> uriBuilder.path("/complete").build()) + .header("Authorization", "Bearer " + tokenAuthenticator.getToken()) .retrieve() .toBodilessEntity().toFuture().get(); - } catch (ExecutionException | InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (ExecutionException e) { log.error("Unable to notify exporter backend that job {} is running", jobId); + } catch (InterruptedException e){ + Thread.currentThread().interrupt(); + log.error("Thread has been interrupted", e); + throw new FailedProcessingException(); } } diff --git a/src/main/java/eu/dissco/exportjob/web/TokenAuthenticator.java b/src/main/java/eu/dissco/exportjob/web/TokenAuthenticator.java new file mode 100644 index 0000000..3fe249e --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/web/TokenAuthenticator.java @@ -0,0 +1,63 @@ +package eu.dissco.exportjob.web; + +import com.fasterxml.jackson.databind.JsonNode; +import eu.dissco.exportjob.exceptions.FailedProcessingException; +import eu.dissco.exportjob.properties.TokenProperties; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +@Component +@RequiredArgsConstructor +@Slf4j +public class TokenAuthenticator { + + @Qualifier("tokenClient") + private final WebClient tokenClient; + private final TokenProperties properties; + + public String getToken() throws FailedProcessingException { + var response = tokenClient + .post() + .body(BodyInserters.fromFormData(properties.getFromFormData())) + .acceptCharset(StandardCharsets.UTF_8) + .retrieve() + .onStatus(HttpStatus.UNAUTHORIZED::equals, + r -> Mono.error(new FailedProcessingException("Service is unauthorized."))) + .bodyToMono(JsonNode.class) + .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)) + .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> + new FailedProcessingException( + "Token Authentication failed to process after max retries") + )); + try { + var tokenNode = response.toFuture().get(); + return getToken(tokenNode); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + log.error("Token authentication: Unable to authenticate processing service with Keycloak. Verify client secret is up to-date"); + throw new FailedProcessingException( + "Unable to authenticate processing service with Keycloak. More information: " + + e.getMessage()); + } + } + + private String getToken(JsonNode tokenNode) throws FailedProcessingException { + if (tokenNode != null && tokenNode.get("access_token") != null) { + return tokenNode.get("access_token").asText(); + } + log.debug("Unexpected response from keycloak server. Unable to parse access_token"); + throw new FailedProcessingException( + "Unable to authenticate processing service with Keycloak. An error has occurred parsing keycloak response"); + } + +} diff --git a/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryIT.java b/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryIT.java new file mode 100644 index 0000000..19cf9eb --- /dev/null +++ b/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryIT.java @@ -0,0 +1,154 @@ +package eu.dissco.exportjob.repository; + +import static eu.dissco.exportjob.utils.TestUtils.DOI_2; +import static eu.dissco.exportjob.utils.TestUtils.MAPPER; +import static eu.dissco.exportjob.utils.TestUtils.ORG_2; +import static eu.dissco.exportjob.utils.TestUtils.PHYS_ID_2; +import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen; +import static eu.dissco.exportjob.utils.TestUtils.givenSearchParams; +import static org.assertj.core.api.Assertions.assertThat; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import eu.dissco.exportjob.domain.SearchParam; +import eu.dissco.exportjob.domain.TargetType; +import eu.dissco.exportjob.properties.ElasticSearchProperties; +import java.io.IOException; +import java.util.List; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class ElasticSearchRepositoryIT { + + private static final DockerImageName ELASTIC_IMAGE = DockerImageName.parse( + "docker.elastic.co/elasticsearch/elasticsearch").withTag("8.7.1"); + private static final String ELASTICSEARCH_USERNAME = "elastic"; + private static final String ELASTICSEARCH_PASSWORD = "s3cret"; + private static final ElasticsearchContainer container = new ElasticsearchContainer( + ELASTIC_IMAGE).withPassword(ELASTICSEARCH_PASSWORD); + private static ElasticsearchClient client; + private static RestClient restClient; + private final ElasticSearchProperties properties = new ElasticSearchProperties(); + private ElasticSearchRepository elasticRepository; + private static final String DIGITAL_SPECIMEN_INDEX = "digital-specimen"; + private static final String DIGITAL_MEDIA_INDEX = "digital-media"; + + @BeforeAll + static void initContainer() { + // Create the elasticsearch container. + container.start(); + + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)); + + HttpHost host = new HttpHost("localhost", + container.getMappedPort(9200), "https"); + final RestClientBuilder builder = RestClient.builder(host); + + builder.setHttpClientConfigCallback(clientBuilder -> { + clientBuilder.setSSLContext(container.createSslContextFromCa()); + clientBuilder.setDefaultCredentialsProvider(credentialsProvider); + return clientBuilder; + }); + restClient = builder.build(); + + ElasticsearchTransport transport = new RestClientTransport(restClient, + new JacksonJsonpMapper(MAPPER)); + + client = new ElasticsearchClient(transport); + } + + @AfterAll + public static void closeResources() throws Exception { + restClient.close(); + } + + @BeforeEach + void initRepository() { + elasticRepository = new ElasticSearchRepository(client, properties); + } + + @AfterEach + void clearIndex() throws IOException { + if (client.indices().exists(re -> re.index(DIGITAL_SPECIMEN_INDEX)).value()) { + client.indices().delete(b -> b.index(DIGITAL_SPECIMEN_INDEX)); + } + if (client.indices().exists(re -> re.index(DIGITAL_MEDIA_INDEX)).value()) { + client.indices().delete(b -> b.index(DIGITAL_MEDIA_INDEX)); + } + } + + @Test + void testGetTotalHits() throws IOException { + // Given + postDigitalSpecimens( + List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2))); + + // When + var result = elasticRepository.getTotalHits(givenSearchParams(), TargetType.DIGITAL_SPECIMEN); + + // Then + assertThat(result).isEqualTo(1L); + } + + @Test + void testGetTargetObject() throws IOException { + // Given + postDigitalSpecimens( + List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2))); + + // When + var result = elasticRepository.getTargetObjects(givenSearchParams(), + TargetType.DIGITAL_SPECIMEN, 1); + + // Then + assertThat(result).isEqualTo(List.of(givenDigitalSpecimen())); + } + + @Test + void testGetTargetObjectEmptyOrg() throws IOException { + // Given + var expected = ((ObjectNode) (givenDigitalSpecimen("doi.org/1", ORG_2, PHYS_ID_2))); + expected.remove("ods:organisationID"); + postDigitalSpecimens( + List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2), expected)); + var searchParam = List.of(new SearchParam("$['ods:organisationID']", null)); + + // When + var result = elasticRepository.getTargetObjects(searchParam, TargetType.DIGITAL_SPECIMEN, 1); + + // Then + assertThat(result).isEqualTo(List.of(expected)); + } + + private void postDigitalSpecimens(List digitalSpecimens) throws IOException { + var bulkRequest = new BulkRequest.Builder(); + for (var digitalSpecimen : digitalSpecimens) { + bulkRequest.operations(op -> op.index( + idx -> idx.index(DIGITAL_SPECIMEN_INDEX).id(digitalSpecimen.get("@id").asText()) + .document(digitalSpecimen))); + } + client.bulk(bulkRequest.build()); + client.indices().refresh(b -> b.index(DIGITAL_SPECIMEN_INDEX)); + } +} diff --git a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java index 1c37d95..80862a3 100644 --- a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java +++ b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java @@ -1,9 +1,9 @@ package eu.dissco.exportjob.utils; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dissco.exportjob.domain.JobRequest; -import eu.dissco.exportjob.domain.JobType; import eu.dissco.exportjob.domain.SearchParam; import eu.dissco.exportjob.domain.TargetType; import java.util.List; @@ -14,23 +14,38 @@ public class TestUtils { private TestUtils() { } + public static final String DOI_1 = "https://doi.org/10.2055/123-456-789"; + public static final String DOI_2 = "https://doi.org/10.2055/ABC-EFG-HIJ"; + public static final String ORG_1 = "https://ror.org/0566bfb96"; + public static final String ORG_2 = "https://ror.org/040ck2b86"; + public static final String PHYS_ID_1 = "AVES.XYZ"; + public static final String PHYS_ID_2 = "AVES.QRS"; public static final UUID JOB_ID = UUID.fromString("cd5c9ee7-23b1-4615-993e-9d56d0720213"); public static final ObjectMapper MAPPER = new ObjectMapper().findAndRegisterModules() .setSerializationInclusion(Include.NON_NULL); private static JobRequest givenJobRequest() { return new JobRequest( - JobType.DOI_LIST, givenSearchParams(), TargetType.DIGITAL_SPECIMEN, UUID.randomUUID() ); } - private static List givenSearchParams() { + public static List givenSearchParams() { return List.of(new SearchParam( - "ods:organisationID", "https://ror.org/03wkt5x30")); + "$['ods:organisationID']", ORG_1)); } + public static JsonNode givenDigitalSpecimen(){ + return givenDigitalSpecimen(DOI_1, ORG_1, PHYS_ID_1); + } + public static JsonNode givenDigitalSpecimen(String doi, String org, String physId){ + return MAPPER.createObjectNode() + .put("ods:ID", doi) + .put("@id", doi) + .put("ods:organisationID", org) + .put("ods:physicalSpecimenID", physId); + } } From e1d72a7c599f9596dcab96fd1421885bee4e8a86 Mon Sep 17 00:00:00 2001 From: southeo Date: Tue, 22 Oct 2024 14:34:44 +0200 Subject: [PATCH 04/12] testing --- .../component/JobRequestComponent.java | 41 ++++------ .../exportjob/domain/JobStateEndpoint.java | 14 ++++ .../exportjob/properties/JobProperties.java | 32 ++++++++ .../repository/ElasticSearchRepository.java | 8 +- .../exportjob/repository/S3Repository.java | 36 ++++----- .../service/AbstractExportJobService.java | 45 ++++++----- .../exportjob/service/DoiListService.java | 7 +- .../exportjob/web/ExporterBackendClient.java | 10 ++- .../component/JobRequestComponentTest.java | 74 +++++++++++++++++++ .../service/AbstractExportJobServiceTest.java | 25 ------- .../exportjob/service/DoiListServiceTest.java | 70 ++++++++++++++++++ .../eu/dissco/exportjob/utils/TestUtils.java | 8 +- src/test/resources/temp.gz | 0 13 files changed, 266 insertions(+), 104 deletions(-) create mode 100644 src/main/java/eu/dissco/exportjob/domain/JobStateEndpoint.java create mode 100644 src/main/java/eu/dissco/exportjob/properties/JobProperties.java create mode 100644 src/test/java/eu/dissco/exportjob/component/JobRequestComponentTest.java delete mode 100644 src/test/java/eu/dissco/exportjob/service/AbstractExportJobServiceTest.java create mode 100644 src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java create mode 100644 src/test/resources/temp.gz diff --git a/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java b/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java index d06878e..ed22dcb 100644 --- a/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java +++ b/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java @@ -1,15 +1,14 @@ package eu.dissco.exportjob.component; import eu.dissco.exportjob.domain.JobRequest; +import eu.dissco.exportjob.domain.JobStateEndpoint; import eu.dissco.exportjob.domain.SearchParam; -import eu.dissco.exportjob.domain.TargetType; -import jakarta.validation.constraints.NotBlank; +import eu.dissco.exportjob.exceptions.FailedProcessingException; +import eu.dissco.exportjob.properties.JobProperties; +import eu.dissco.exportjob.web.ExporterBackendClient; import java.util.ArrayList; -import java.util.List; -import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component @@ -17,34 +16,22 @@ @RequiredArgsConstructor public class JobRequestComponent { - @NotBlank - @Value("#{'${job.input-fields}'.split(',')}") - List inputFields; + private final JobProperties properties; + private final ExporterBackendClient client; - @NotBlank - @Value("#{'${job.input-values}'.split(',')}") - List inputValues; - - @NotBlank - @Value("${job.target-type}") - TargetType targetType; - - @NotBlank - @Value("${job.id}") - UUID jobId; - - public JobRequest getJobRequest() { + public JobRequest getJobRequest() throws FailedProcessingException { var searchParams = new ArrayList(); - if (inputFields.size() != inputValues.size()) { + if (properties.getInputFields().size() != properties.getInputValues().size()) { log.error("Mismatch between input fields and input values for searching"); - throw new IllegalStateException(); + client.updateJobState(properties.getJobId(), JobStateEndpoint.FAILED.getEndpoint()); + throw new FailedProcessingException(); } - for (int i = 0; i < inputFields.size(); i++) { - searchParams.add(new SearchParam(inputFields.get(i), inputValues.get(i))); + for (int i = 0; i < properties.getInputFields().size(); i++) { + searchParams.add(new SearchParam(properties.getInputFields().get(i), properties.getInputValues().get(i))); } - log.info("Received {} job request with id {} and {} search parameters", targetType, jobId, + log.info("Received {} job request with id {} and {} search parameters", properties.getTargetType(), properties.getJobId(), searchParams); - return new JobRequest(searchParams, targetType, jobId); + return new JobRequest(searchParams, properties.getTargetType(), properties.getJobId()); } } diff --git a/src/main/java/eu/dissco/exportjob/domain/JobStateEndpoint.java b/src/main/java/eu/dissco/exportjob/domain/JobStateEndpoint.java new file mode 100644 index 0000000..836b360 --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/domain/JobStateEndpoint.java @@ -0,0 +1,14 @@ +package eu.dissco.exportjob.domain; + +import lombok.Getter; + +@Getter +public enum JobStateEndpoint { + FAILED("/failed"), + RUNNING("/running"); + + private final String endpoint; + JobStateEndpoint(String s){ + this.endpoint = s; + } +} diff --git a/src/main/java/eu/dissco/exportjob/properties/JobProperties.java b/src/main/java/eu/dissco/exportjob/properties/JobProperties.java new file mode 100644 index 0000000..01a486d --- /dev/null +++ b/src/main/java/eu/dissco/exportjob/properties/JobProperties.java @@ -0,0 +1,32 @@ +package eu.dissco.exportjob.properties; + +import eu.dissco.exportjob.domain.TargetType; +import jakarta.validation.constraints.NotBlank; +import java.util.List; +import java.util.UUID; +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Data +@Validated +@ConfigurationProperties(prefix = "job") +public class JobProperties { + @NotBlank + @Value("#{'${job.input-fields}'.split(',')}") + List inputFields; + + @NotBlank + @Value("#{'${job.input-values}'.split(',')}") + List inputValues; + + @NotBlank + @Value("${job.target-type}") + TargetType targetType; + + @NotBlank + @Value("${job.id}") + UUID jobId; + +} diff --git a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java index 0a3c265..dc57fc5 100644 --- a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java +++ b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java @@ -14,7 +14,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutionException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Repository; @@ -35,12 +34,7 @@ public Long getTotalHits(List searchParams, TargetType targetType) .query( q -> q.bool(b -> b.must(query))) .build(); - try { - var count = client.count(countRequest); - return count.count(); - } catch (Exception e){ - throw e; - } + return client.count(countRequest).count(); } public List getTargetObjects(List searchParams, TargetType targetType, diff --git a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java index d3d2341..b548667 100644 --- a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java +++ b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java @@ -4,32 +4,32 @@ import java.util.UUID; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Repository; -import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.transfer.s3.S3TransferManager; @Repository @RequiredArgsConstructor public class S3Repository { - private S3AsyncClient s3Client; - private S3TransferManager transferManager; - private static final String BUCKET_NAME = "dissco-download"; - - public void upload(File file, UUID jobId) { - var request = PutObjectRequest.builder() - .bucket(BUCKET_NAME) - .key(jobId.toString()) - .build(); - var body = AsyncRequestBody.fromFile(file); - var upload = transferManager.upload(builder -> builder - .requestBody(body) - .putObjectRequest(request) - .build()); - + private final S3AsyncClient s3Client; + private static final String BUCKET_NAME = "dissco-data-export"; + public String uploadResults(File file, UUID jobId) { + try (var transferManager = S3TransferManager.builder().s3Client(s3Client).build()) { + var upload = transferManager + .uploadFile(uploadFileRequest -> uploadFileRequest + .putObjectRequest(putObjectRequest -> putObjectRequest + .bucket(BUCKET_NAME) + .key(jobId.toString())) + .source(file)); + upload.completionFuture().join(); + return s3Client.utilities().getUrl( + builder -> builder + .bucket(BUCKET_NAME) + .key(jobId.toString())) + .toString(); + } } + } diff --git a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java index d8737c8..b5fda4f 100644 --- a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java +++ b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java @@ -2,8 +2,10 @@ import com.fasterxml.jackson.databind.JsonNode; import eu.dissco.exportjob.domain.JobRequest; +import eu.dissco.exportjob.domain.JobStateEndpoint; import eu.dissco.exportjob.exceptions.FailedProcessingException; import eu.dissco.exportjob.repository.ElasticSearchRepository; +import eu.dissco.exportjob.repository.S3Repository; import eu.dissco.exportjob.web.ExporterBackendClient; import java.io.File; import java.io.IOException; @@ -11,9 +13,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; @Service @Slf4j @@ -22,35 +21,43 @@ public abstract class AbstractExportJobService { private final ElasticSearchRepository elasticSearchRepository; private final ExporterBackendClient exporterBackendClient; + private final S3Repository s3Repository; protected static final String TEMP_FILE_NAME = "src/main/resources/tmp.csv.gz"; - private static final String BUCKET_NAME = "dissco-download"; public void handleMessage(JobRequest jobRequest) throws FailedProcessingException { - exporterBackendClient.updateJobState(jobRequest.jobId(), "/running"); + exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.RUNNING.getEndpoint()); try { - processSearchResults(jobRequest); + var uploadData = processSearchResults(jobRequest); + if (uploadData){ + var url = s3Repository.uploadResults(new File(TEMP_FILE_NAME), jobRequest.jobId()); + exporterBackendClient.markJobAsComplete(jobRequest.jobId(), url); + } else { + exporterBackendClient.markJobAsComplete(jobRequest.jobId(), null); + } } catch (IOException e) { log.error("An error has occurred", e); - exporterBackendClient.updateJobState(jobRequest.jobId(), "/failed"); + exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.FAILED.getEndpoint()); } - // todo - //exporterBackendClient.markJobAsComplete(jobRequest.jobId()); } - private void processSearchResults(JobRequest jobRequest) throws IOException { + private boolean processSearchResults(JobRequest jobRequest) throws IOException { var totalHits = elasticSearchRepository.getTotalHits(jobRequest.searchParams(), jobRequest.targetType()); - int pageNum = 1; - var hitsProcessed = 0; - writeHeaderToFile(); - while (hitsProcessed < totalHits) { - var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), - jobRequest.targetType(), pageNum); - writeResultsToFile(searchResult); - hitsProcessed = hitsProcessed + searchResult.size(); - pageNum = pageNum + 1; + if (totalHits > 0){ + int pageNum = 1; + var hitsProcessed = 0; + writeHeaderToFile(); + while (hitsProcessed < totalHits) { + var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), + jobRequest.targetType(), pageNum); + writeResultsToFile(searchResult); + hitsProcessed = hitsProcessed + searchResult.size(); + pageNum = pageNum + 1; + } + return true; } + return false; } protected abstract void writeHeaderToFile() throws IOException; diff --git a/src/main/java/eu/dissco/exportjob/service/DoiListService.java b/src/main/java/eu/dissco/exportjob/service/DoiListService.java index 8a0bd4c..57a3921 100644 --- a/src/main/java/eu/dissco/exportjob/service/DoiListService.java +++ b/src/main/java/eu/dissco/exportjob/service/DoiListService.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.JsonNode; import eu.dissco.exportjob.Profiles; import eu.dissco.exportjob.repository.ElasticSearchRepository; +import eu.dissco.exportjob.repository.S3Repository; import eu.dissco.exportjob.web.ExporterBackendClient; import java.io.FileOutputStream; import java.io.IOException; @@ -22,9 +23,9 @@ public class DoiListService extends AbstractExportJobService { StandardCharsets.UTF_8); public DoiListService( - ElasticSearchRepository elasticSearchRepository, - ExporterBackendClient exporterBackendClient) { - super(elasticSearchRepository, exporterBackendClient); + ElasticSearchRepository elasticSearchRepository, ExporterBackendClient exporterBackendClient, + S3Repository s3Repository) { + super(elasticSearchRepository, exporterBackendClient, s3Repository); } protected void writeHeaderToFile() throws IOException { diff --git a/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java b/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java index 7b25060..bdb9d58 100644 --- a/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java +++ b/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java @@ -1,5 +1,6 @@ package eu.dissco.exportjob.web; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dissco.exportjob.exceptions.FailedProcessingException; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -8,6 +9,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.HttpMethod; import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; @Component @@ -17,6 +19,7 @@ public class ExporterBackendClient { @Qualifier("exporterClient") private final WebClient webClient; + private final ObjectMapper mapper; private final TokenAuthenticator tokenAuthenticator; public void updateJobState(UUID jobId, String path) throws FailedProcessingException { @@ -37,13 +40,16 @@ public void updateJobState(UUID jobId, String path) throws FailedProcessingExcep } } - // todo -> define body in backend - public void markJobAsComplete(UUID jobId) throws FailedProcessingException { + public void markJobAsComplete(UUID jobId, String downloadLink) throws FailedProcessingException { + var body = mapper.createObjectNode() + .put("id", jobId.toString()) + .put("downloadLink", downloadLink); try { webClient .method(HttpMethod.POST) .uri(uriBuilder -> uriBuilder.path("/complete").build()) .header("Authorization", "Bearer " + tokenAuthenticator.getToken()) + .body(BodyInserters.fromValue(body)) .retrieve() .toBodilessEntity().toFuture().get(); } catch (ExecutionException e) { diff --git a/src/test/java/eu/dissco/exportjob/component/JobRequestComponentTest.java b/src/test/java/eu/dissco/exportjob/component/JobRequestComponentTest.java new file mode 100644 index 0000000..e5a67a3 --- /dev/null +++ b/src/test/java/eu/dissco/exportjob/component/JobRequestComponentTest.java @@ -0,0 +1,74 @@ +package eu.dissco.exportjob.component; + +import static eu.dissco.exportjob.utils.TestUtils.JOB_ID; +import static eu.dissco.exportjob.utils.TestUtils.ORG_1; +import static eu.dissco.exportjob.utils.TestUtils.ORG_2; +import static eu.dissco.exportjob.utils.TestUtils.ORG_FIELD_NAME; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.BDDMockito.then; + +import eu.dissco.exportjob.domain.JobRequest; +import eu.dissco.exportjob.domain.JobStateEndpoint; +import eu.dissco.exportjob.domain.SearchParam; +import eu.dissco.exportjob.domain.TargetType; +import eu.dissco.exportjob.exceptions.FailedProcessingException; +import eu.dissco.exportjob.properties.JobProperties; +import eu.dissco.exportjob.web.ExporterBackendClient; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class JobRequestComponentTest { + + private JobRequestComponent jobRequestComponent; + private JobProperties properties; + @Mock + private ExporterBackendClient client; + + @BeforeEach + void init(){ + properties = new JobProperties(); + jobRequestComponent = new JobRequestComponent(properties, client); + } + + @Test + void testHandleMessage() throws FailedProcessingException { + // Given + var expected = new JobRequest( + List.of(new SearchParam(ORG_FIELD_NAME, ORG_1), new SearchParam(ORG_FIELD_NAME, ORG_2)), + TargetType.DIGITAL_SPECIMEN, + JOB_ID + ); + properties.setInputFields(List.of(ORG_FIELD_NAME, ORG_FIELD_NAME)); + properties.setJobId(JOB_ID); + properties.setInputValues(List.of(ORG_1, ORG_2)); + properties.setTargetType(TargetType.DIGITAL_SPECIMEN); + + // When + var result = jobRequestComponent.getJobRequest(); + + // Then + assertThat(result).isEqualTo(expected); + } + + @Test + void testHandleMessageInvalidParams() throws FailedProcessingException { + // Given + properties.setInputFields(List.of(ORG_FIELD_NAME)); + properties.setJobId(JOB_ID); + properties.setInputValues(List.of(ORG_1, ORG_2)); + properties.setTargetType(TargetType.DIGITAL_SPECIMEN); + + // When + assertThrows(FailedProcessingException.class, () -> jobRequestComponent.getJobRequest()); + + // Then + then(client).should().updateJobState(JOB_ID, JobStateEndpoint.FAILED.getEndpoint()); + } + +} diff --git a/src/test/java/eu/dissco/exportjob/service/AbstractExportJobServiceTest.java b/src/test/java/eu/dissco/exportjob/service/AbstractExportJobServiceTest.java deleted file mode 100644 index f99ff26..0000000 --- a/src/test/java/eu/dissco/exportjob/service/AbstractExportJobServiceTest.java +++ /dev/null @@ -1,25 +0,0 @@ -package eu.dissco.exportjob.service; - -import eu.dissco.exportjob.repository.ElasticSearchRepository; -import eu.dissco.exportjob.web.ExporterBackendClient; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class AbstractExportJobServiceTest { - private AbstractExportJobService service; - - @Mock - private ElasticSearchRepository elasticSearchRepository; - @Mock - private ExporterBackendClient exporterBackendClient; - - @BeforeEach - void init(){ - //service = new AbstractExportJobService(elasticSearchRepository, exporterBackendClient); - } - - -} diff --git a/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java b/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java new file mode 100644 index 0000000..87783b7 --- /dev/null +++ b/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java @@ -0,0 +1,70 @@ +package eu.dissco.exportjob.service; + +import static eu.dissco.exportjob.utils.TestUtils.DOWNLOAD_LINK; +import static eu.dissco.exportjob.utils.TestUtils.JOB_ID; +import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen; +import static eu.dissco.exportjob.utils.TestUtils.givenJobRequest; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; + +import eu.dissco.exportjob.repository.ElasticSearchRepository; +import eu.dissco.exportjob.repository.S3Repository; +import eu.dissco.exportjob.web.ExporterBackendClient; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DoiListServiceTest { + + private DoiListService service; + + @Mock + private ElasticSearchRepository elasticSearchRepository; + @Mock + private ExporterBackendClient exporterBackendClient; + @Mock + private S3Repository s3Repository; + + @BeforeEach + void init() { + service = new DoiListService(elasticSearchRepository, exporterBackendClient, s3Repository); + } + + @Test + void testHandleMessageNoResultsFound() throws Exception { + // Given + given(elasticSearchRepository.getTotalHits(any(), any())).willReturn(0L); + + // When + service.handleMessage(givenJobRequest()); + + // Then + then(elasticSearchRepository).shouldHaveNoMoreInteractions(); + then(s3Repository).shouldHaveNoInteractions(); + then(exporterBackendClient).should().markJobAsComplete(JOB_ID, null); + } + + @Test + void testHandleMessage() throws Exception { + // GIven + given(elasticSearchRepository.getTotalHits(any(), any())).willReturn(1L); + given(elasticSearchRepository.getTargetObjects(any(), any(), anyInt())).willReturn( + List.of(givenDigitalSpecimen())); + given(s3Repository.uploadResults(any(), eq(JOB_ID))).willReturn(DOWNLOAD_LINK); + + // When + service.handleMessage(givenJobRequest()); + + // Then + then(exporterBackendClient).should().markJobAsComplete(JOB_ID, DOWNLOAD_LINK); + } + + +} diff --git a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java index 80862a3..2ad6838 100644 --- a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java +++ b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java @@ -23,18 +23,20 @@ private TestUtils() { public static final UUID JOB_ID = UUID.fromString("cd5c9ee7-23b1-4615-993e-9d56d0720213"); public static final ObjectMapper MAPPER = new ObjectMapper().findAndRegisterModules() .setSerializationInclusion(Include.NON_NULL); + public static final String DOWNLOAD_LINK = "https://aws.download/s3"; + public static final String ORG_FIELD_NAME = "$['ods:organisationID']"; - private static JobRequest givenJobRequest() { + public static JobRequest givenJobRequest() { return new JobRequest( givenSearchParams(), TargetType.DIGITAL_SPECIMEN, - UUID.randomUUID() + JOB_ID ); } public static List givenSearchParams() { return List.of(new SearchParam( - "$['ods:organisationID']", ORG_1)); + ORG_FIELD_NAME, ORG_1)); } public static JsonNode givenDigitalSpecimen(){ diff --git a/src/test/resources/temp.gz b/src/test/resources/temp.gz new file mode 100644 index 0000000..e69de29 From cdb4237f89eb853671bd25c78a329758948bcbd2 Mon Sep 17 00:00:00 2001 From: southeo Date: Tue, 22 Oct 2024 15:51:36 +0200 Subject: [PATCH 05/12] testing --- pom.xml | 10 ++ .../component/JobRequestComponent.java | 2 +- .../service/AbstractExportJobService.java | 4 +- .../exportjob/web/ExporterBackendClient.java | 10 +- .../exportjob/web/TokenAuthenticator.java | 7 + .../component/JobRequestComponentTest.java | 2 +- .../web/ExporterBackendClientTest.java | 116 ++++++++++++ .../exportjob/web/TokenAuthenticatorTest.java | 166 ++++++++++++++++++ 8 files changed, 309 insertions(+), 8 deletions(-) create mode 100644 src/test/java/eu/dissco/exportjob/web/ExporterBackendClientTest.java create mode 100644 src/test/java/eu/dissco/exportjob/web/TokenAuthenticatorTest.java diff --git a/pom.xml b/pom.xml index 9645578..1a1a5c6 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,16 @@ junit-jupiter test + + com.squareup.okhttp3 + okhttp + test + + + com.squareup.okhttp3 + mockwebserver + test + diff --git a/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java b/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java index ed22dcb..03ec365 100644 --- a/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java +++ b/src/main/java/eu/dissco/exportjob/component/JobRequestComponent.java @@ -23,7 +23,7 @@ public JobRequest getJobRequest() throws FailedProcessingException { var searchParams = new ArrayList(); if (properties.getInputFields().size() != properties.getInputValues().size()) { log.error("Mismatch between input fields and input values for searching"); - client.updateJobState(properties.getJobId(), JobStateEndpoint.FAILED.getEndpoint()); + client.updateJobState(properties.getJobId(), JobStateEndpoint.FAILED); throw new FailedProcessingException(); } for (int i = 0; i < properties.getInputFields().size(); i++) { diff --git a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java index b5fda4f..596f7e2 100644 --- a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java +++ b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java @@ -26,7 +26,7 @@ public abstract class AbstractExportJobService { public void handleMessage(JobRequest jobRequest) throws FailedProcessingException { - exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.RUNNING.getEndpoint()); + exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.RUNNING); try { var uploadData = processSearchResults(jobRequest); if (uploadData){ @@ -37,7 +37,7 @@ public void handleMessage(JobRequest jobRequest) throws FailedProcessingExceptio } } catch (IOException e) { log.error("An error has occurred", e); - exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.FAILED.getEndpoint()); + exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.FAILED); } } diff --git a/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java b/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java index bdb9d58..9cb2104 100644 --- a/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java +++ b/src/main/java/eu/dissco/exportjob/web/ExporterBackendClient.java @@ -1,6 +1,7 @@ package eu.dissco.exportjob.web; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dissco.exportjob.domain.JobStateEndpoint; import eu.dissco.exportjob.exceptions.FailedProcessingException; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -22,17 +23,18 @@ public class ExporterBackendClient { private final ObjectMapper mapper; private final TokenAuthenticator tokenAuthenticator; - public void updateJobState(UUID jobId, String path) throws FailedProcessingException { + public void updateJobState(UUID jobId, JobStateEndpoint stateEndpoint) throws FailedProcessingException { + var endpoint = stateEndpoint.getEndpoint(); try { webClient .method(HttpMethod.POST) - .uri(uriBuilder -> uriBuilder.path("/" + jobId.toString() + path).build()) + .uri(uriBuilder -> uriBuilder.path("/" + jobId.toString() + endpoint).build()) .header("Authorization", "Bearer " + tokenAuthenticator.getToken()) .retrieve() .toBodilessEntity().toFuture().get(); } catch (ExecutionException e) { log.error("Unable to notify exporter backend that job {} is {}", jobId, - path.replace("/", "")); + endpoint.replace("/", "")); } catch (InterruptedException e) { log.error("Thread has been interrupted", e); Thread.currentThread().interrupt(); @@ -53,7 +55,7 @@ public void markJobAsComplete(UUID jobId, String downloadLink) throws FailedProc .retrieve() .toBodilessEntity().toFuture().get(); } catch (ExecutionException e) { - log.error("Unable to notify exporter backend that job {} is running", jobId); + log.error("Unable to notify exporter backend that job {} is complete", jobId); } catch (InterruptedException e){ Thread.currentThread().interrupt(); log.error("Thread has been interrupted", e); diff --git a/src/main/java/eu/dissco/exportjob/web/TokenAuthenticator.java b/src/main/java/eu/dissco/exportjob/web/TokenAuthenticator.java index 3fe249e..5cace23 100644 --- a/src/main/java/eu/dissco/exportjob/web/TokenAuthenticator.java +++ b/src/main/java/eu/dissco/exportjob/web/TokenAuthenticator.java @@ -13,6 +13,7 @@ import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -35,6 +36,7 @@ public String getToken() throws FailedProcessingException { r -> Mono.error(new FailedProcessingException("Service is unauthorized."))) .bodyToMono(JsonNode.class) .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)) + .filter(TokenAuthenticator::is5xxServerError) .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> new FailedProcessingException( "Token Authentication failed to process after max retries") @@ -60,4 +62,9 @@ private String getToken(JsonNode tokenNode) throws FailedProcessingException { "Unable to authenticate processing service with Keycloak. An error has occurred parsing keycloak response"); } + private static boolean is5xxServerError(Throwable throwable) { + return throwable instanceof WebClientResponseException webClientResponseException + && webClientResponseException.getStatusCode().is5xxServerError(); + } + } diff --git a/src/test/java/eu/dissco/exportjob/component/JobRequestComponentTest.java b/src/test/java/eu/dissco/exportjob/component/JobRequestComponentTest.java index e5a67a3..68c12d5 100644 --- a/src/test/java/eu/dissco/exportjob/component/JobRequestComponentTest.java +++ b/src/test/java/eu/dissco/exportjob/component/JobRequestComponentTest.java @@ -68,7 +68,7 @@ void testHandleMessageInvalidParams() throws FailedProcessingException { assertThrows(FailedProcessingException.class, () -> jobRequestComponent.getJobRequest()); // Then - then(client).should().updateJobState(JOB_ID, JobStateEndpoint.FAILED.getEndpoint()); + then(client).should().updateJobState(JOB_ID, JobStateEndpoint.FAILED); } } diff --git a/src/test/java/eu/dissco/exportjob/web/ExporterBackendClientTest.java b/src/test/java/eu/dissco/exportjob/web/ExporterBackendClientTest.java new file mode 100644 index 0000000..161fe55 --- /dev/null +++ b/src/test/java/eu/dissco/exportjob/web/ExporterBackendClientTest.java @@ -0,0 +1,116 @@ +package eu.dissco.exportjob.web; + +import static eu.dissco.exportjob.utils.TestUtils.DOWNLOAD_LINK; +import static eu.dissco.exportjob.utils.TestUtils.JOB_ID; +import static eu.dissco.exportjob.utils.TestUtils.MAPPER; +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import eu.dissco.exportjob.domain.JobStateEndpoint; +import eu.dissco.exportjob.exceptions.FailedProcessingException; +import java.io.IOException; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.WebClient; + +@ExtendWith(MockitoExtension.class) +class ExporterBackendClientTest { + + private static MockWebServer mockServer; + @Mock + private TokenAuthenticator tokenAuthenticator; + private ExporterBackendClient exporterBackendClient; + + + @BeforeAll + static void init() throws IOException { + mockServer = new MockWebServer(); + mockServer.start(); + } + + @BeforeEach + void setup() { + WebClient webClient = WebClient.create( + String.format("http://%s:%s", mockServer.getHostName(), mockServer.getPort())); + exporterBackendClient = new ExporterBackendClient(webClient, MAPPER, tokenAuthenticator); + } + + @AfterAll + static void destroy() throws IOException { + mockServer.shutdown(); + } + + @Test + void testUpdateJobState() { + mockServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.NO_CONTENT.value())); + + // When / Then + assertDoesNotThrow(() -> exporterBackendClient.updateJobState(JOB_ID, + JobStateEndpoint.RUNNING)); + } + + @Test + void testUpdateJobStateFailedToNotify() { + mockServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.BAD_GATEWAY.value())); + + // When / Then + assertDoesNotThrow(() -> exporterBackendClient.updateJobState(JOB_ID, + JobStateEndpoint.RUNNING)); + } + + @Test + void testUpdateJobStateInterrupted() { + // Given + mockServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.BAD_GATEWAY.value())); + Thread.currentThread().interrupt(); + + // When / Then + assertThrows(FailedProcessingException.class, + () -> exporterBackendClient.updateJobState(JOB_ID, JobStateEndpoint.RUNNING)); + } + + @Test + void testMarkJobAsComplete() { + // Given + mockServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.NO_CONTENT.value())); + + // When / Then + assertDoesNotThrow(() -> exporterBackendClient.markJobAsComplete(JOB_ID, DOWNLOAD_LINK)); + } + + @Test + void testMarkJobAsCompleteFailedToNotify() { + // Given + mockServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.BAD_GATEWAY.value())); + + // When / Then + assertDoesNotThrow(() -> exporterBackendClient.markJobAsComplete(JOB_ID, DOWNLOAD_LINK)); + } + + @Test + void testMarkJobAsCompleteInterrupted() { + // Given + mockServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.BAD_GATEWAY.value())); + Thread.currentThread().interrupt(); + + // When / Then + assertThrows(FailedProcessingException.class, + () -> exporterBackendClient.markJobAsComplete(JOB_ID, DOWNLOAD_LINK)); + } + + +} diff --git a/src/test/java/eu/dissco/exportjob/web/TokenAuthenticatorTest.java b/src/test/java/eu/dissco/exportjob/web/TokenAuthenticatorTest.java new file mode 100644 index 0000000..be34a52 --- /dev/null +++ b/src/test/java/eu/dissco/exportjob/web/TokenAuthenticatorTest.java @@ -0,0 +1,166 @@ +package eu.dissco.exportjob.web; + +import static eu.dissco.exportjob.utils.TestUtils.MAPPER; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.BDDMockito.given; + +import com.fasterxml.jackson.databind.JsonNode; +import eu.dissco.exportjob.exceptions.FailedProcessingException; +import eu.dissco.exportjob.properties.TokenProperties; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.HttpStatus; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.reactive.function.client.WebClient; + +@ExtendWith(MockitoExtension.class) +class TokenAuthenticatorTest { + + private static MockWebServer mockTokenServer; + private final MultiValueMap testFromFormData = new LinkedMultiValueMap<>() {{ + add("grant_type", "grantType"); + add("client_id", "clientId"); + add("client_secret", "secret"); + }}; + @Mock + private TokenProperties properties; + @Mock + private CompletableFuture jsonFuture; + private TokenAuthenticator authenticator; + + @BeforeAll + static void init() throws IOException { + mockTokenServer = new MockWebServer(); + mockTokenServer.start(); + } + + @AfterAll + static void destroy() throws IOException { + mockTokenServer.shutdown(); + } + + private static JsonNode givenTokenResponse() throws Exception { + return MAPPER.readTree(""" + { + "access_token": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\ + aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\ + aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\ + aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "expires_in": 3600, + "refresh_expires_in": 0, + "token_type": "Bearer", + "not-before-policy": 0, + "scope": "" + } + """); + } + + @BeforeEach + void setup() { + WebClient webClient = WebClient.create( + String.format("http://%s:%s", mockTokenServer.getHostName(), mockTokenServer.getPort())); + authenticator = new TokenAuthenticator(webClient, properties); + } + + @Test + void testGetToken() throws Exception { + // Given + var expectedJson = givenTokenResponse(); + var expected = expectedJson.get("access_token").asText(); + given(properties.getFromFormData()).willReturn(testFromFormData); + + mockTokenServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.OK.value()) + .setBody(MAPPER.writeValueAsString(expectedJson)) + .addHeader("Content-Type", "application/json")); + + // When + var response = authenticator.getToken(); + + // Then + assertThat(response).isEqualTo(expected); + } + + @Test + void testGetTokenUnauthorized() { + // Given + given(properties.getFromFormData()).willReturn(testFromFormData); + mockTokenServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.UNAUTHORIZED.value()) + .addHeader("Content-Type", "application/json")); + + // Then + assertThrows(FailedProcessingException.class, () -> authenticator.getToken()); + } + + @Test + void testRetriesSuccess() throws Exception { + // Given + int requestCount = mockTokenServer.getRequestCount(); + var expectedJson = givenTokenResponse(); + var expected = expectedJson.get("access_token").asText(); + given(properties.getFromFormData()).willReturn(testFromFormData); + mockTokenServer.enqueue(new MockResponse().setResponseCode(501)); + mockTokenServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.OK.value()) + .setBody(MAPPER.writeValueAsString(expectedJson)) + .addHeader("Content-Type", "application/json")); + + // When + var response = authenticator.getToken(); + + // Then + assertThat(response).isEqualTo(expected); + assertThat(mockTokenServer.getRequestCount() - requestCount).isEqualTo(2); + } + + @Test + void testRetriesFailure() { + // Given + int requestCount = mockTokenServer.getRequestCount(); + given(properties.getFromFormData()).willReturn(testFromFormData); + mockTokenServer.enqueue(new MockResponse().setResponseCode(501)); + mockTokenServer.enqueue(new MockResponse().setResponseCode(501)); + mockTokenServer.enqueue(new MockResponse().setResponseCode(501)); + mockTokenServer.enqueue(new MockResponse().setResponseCode(501)); + + // Then + assertThrows(FailedProcessingException.class, () -> authenticator.getToken()); + assertThat(mockTokenServer.getRequestCount() - requestCount).isEqualTo(4); + } + + @Test + void testGetResponseIsNull() { + given(properties.getFromFormData()).willReturn(testFromFormData); + mockTokenServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.OK.value()) + .addHeader("Content-Type", "application/json")); + + // When + assertThrows(FailedProcessingException.class, () -> authenticator.getToken()); + } + + @Test + void testGetTokenIsNull() { + given(properties.getFromFormData()).willReturn(testFromFormData); + mockTokenServer.enqueue(new MockResponse() + .setResponseCode(HttpStatus.OK.value()) + .addHeader("Content-Type", "application/json") + .setBody("{}")); + + // When + assertThrows(FailedProcessingException.class, () -> authenticator.getToken()); + } + +} From ac4fe8d1fc5788751f6312b738e9d5136af73cc9 Mon Sep 17 00:00:00 2001 From: southeo Date: Tue, 22 Oct 2024 15:56:52 +0200 Subject: [PATCH 06/12] testing --- .../java/eu/dissco/exportjob/service/DoiListServiceTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java b/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java index 87783b7..c196d0c 100644 --- a/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java +++ b/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java @@ -1,5 +1,6 @@ package eu.dissco.exportjob.service; +import static eu.dissco.exportjob.Profiles.DOI_LIST; import static eu.dissco.exportjob.utils.TestUtils.DOWNLOAD_LINK; import static eu.dissco.exportjob.utils.TestUtils.JOB_ID; import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen; @@ -19,8 +20,10 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.context.ActiveProfiles; @ExtendWith(MockitoExtension.class) +@ActiveProfiles(profiles = DOI_LIST) class DoiListServiceTest { private DoiListService service; From ce938ed9b4783721bc51c3fc975fd3606d53c266 Mon Sep 17 00:00:00 2001 From: southeo Date: Tue, 22 Oct 2024 16:04:40 +0200 Subject: [PATCH 07/12] testing --- src/main/resources/tmp.csv.gz | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/main/resources/tmp.csv.gz diff --git a/src/main/resources/tmp.csv.gz b/src/main/resources/tmp.csv.gz new file mode 100644 index 0000000..e69de29 From f38d2a9588afde5b3344dc45a048565745b980d3 Mon Sep 17 00:00:00 2001 From: southeo Date: Tue, 22 Oct 2024 16:29:38 +0200 Subject: [PATCH 08/12] sonar --- pom.xml | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/pom.xml b/pom.xml index 1a1a5c6..091d9e2 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,10 @@ 2.1.3 5.9 2.25.60 + dissco + https://sonarcloud.io + ../app-it/target/site/jacoco-aggregate/jacoco.xml + @@ -114,6 +118,38 @@ org.springframework.boot spring-boot-maven-plugin + + org.jacoco + jacoco-maven-plugin + 0.8.7 + + + prepare-agent + + prepare-agent + + + + prepare-agent-integration + + prepare-agent-integration + + + + report + prepare-package + + report + + + + report-integration + + report-integration + + + + From f52ebb6ab89ab99e723612ce6d10a64eced3a9c4 Mon Sep 17 00:00:00 2001 From: southeo Date: Tue, 22 Oct 2024 16:32:50 +0200 Subject: [PATCH 09/12] jacoco --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 091d9e2..b3911ec 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,7 @@ org.jacoco jacoco-maven-plugin - 0.8.7 + 0.8.12 prepare-agent From 525d8cf6f581d5ae7788dc81c011b0827f422d5e Mon Sep 17 00:00:00 2001 From: southeo Date: Tue, 22 Oct 2024 16:47:30 +0200 Subject: [PATCH 10/12] jacoco --- ...SearchRepositoryIT.java => ElasticSearchRepositoryTest.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename src/test/java/eu/dissco/exportjob/repository/{ElasticSearchRepositoryIT.java => ElasticSearchRepositoryTest.java} (99%) diff --git a/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryIT.java b/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java similarity index 99% rename from src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryIT.java rename to src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java index 19cf9eb..43ffe67 100644 --- a/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryIT.java +++ b/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java @@ -37,7 +37,7 @@ import org.testcontainers.utility.DockerImageName; @Testcontainers -public class ElasticSearchRepositoryIT { +public class ElasticSearchRepositoryTest { private static final DockerImageName ELASTIC_IMAGE = DockerImageName.parse( "docker.elastic.co/elasticsearch/elasticsearch").withTag("8.7.1"); From 6e9f51770a31ef092a93539264a44e0eba5d6efc Mon Sep 17 00:00:00 2001 From: southeo Date: Mon, 28 Oct 2024 16:34:14 +0100 Subject: [PATCH 11/12] code reivew --- .../ApplicationConfiguration.java | 8 ++++ .../repository/ElasticSearchRepository.java | 43 +++++++----------- .../exportjob/repository/S3Repository.java | 8 +++- .../service/AbstractExportJobService.java | 36 +++++++++------ .../exportjob/service/DoiListService.java | 12 +++-- src/main/resources/tmp.csv.gz | Bin 0 -> 61 bytes .../ElasticSearchRepositoryTest.java | 20 ++++---- .../exportjob/service/DoiListServiceTest.java | 8 ++-- .../eu/dissco/exportjob/utils/TestUtils.java | 20 +++++++- 9 files changed, 94 insertions(+), 61 deletions(-) diff --git a/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java b/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java index 93cfbd5..3f8377a 100644 --- a/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java +++ b/src/main/java/eu/dissco/exportjob/configuration/ApplicationConfiguration.java @@ -2,6 +2,8 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -16,4 +18,10 @@ public ObjectMapper objectMapper() { mapper.setSerializationInclusion(Include.NON_NULL); return mapper; } + + @Bean + public DateTimeFormatter formatter() { + return DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); + } + } diff --git a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java index dc57fc5..36910b4 100644 --- a/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java +++ b/src/main/java/eu/dissco/exportjob/repository/ElasticSearchRepository.java @@ -1,8 +1,8 @@ package eu.dissco.exportjob.repository; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.SortOrder; import co.elastic.clients.elasticsearch._types.query_dsl.Query; -import co.elastic.clients.elasticsearch.core.CountRequest; import co.elastic.clients.elasticsearch.core.SearchRequest; import co.elastic.clients.elasticsearch.core.search.Hit; import com.fasterxml.jackson.databind.JsonNode; @@ -22,36 +22,34 @@ @Repository @RequiredArgsConstructor public class ElasticSearchRepository { + private final ElasticsearchClient client; private final ElasticSearchProperties properties; - - public Long getTotalHits(List searchParams, TargetType targetType) - throws IOException { - var query = generateQuery(searchParams); - var index = getIndex(targetType); - var countRequest = new CountRequest.Builder() - .index(index) - .query( - q -> q.bool(b -> b.must(query))) - .build(); - return client.count(countRequest).count(); - } + private static final String SORT_BY = "dcterms:identifier.keyword"; public List getTargetObjects(List searchParams, TargetType targetType, - int pageNumber) + String lastId, List targetFields) throws IOException { var query = generateQuery(searchParams); var index = getIndex(targetType); - var searchRequest = new SearchRequest.Builder() + var searchRequestBuilder = new SearchRequest.Builder() .index(index) .query( q -> q.bool(b -> b.must(query))) .trackTotalHits(t -> t.enabled(Boolean.TRUE)) - .from(getOffset(pageNumber, properties.getPageSize())) .size(properties.getPageSize()) - .build(); - var searchResult = client.search(searchRequest, ObjectNode.class); + .sort(s -> s.field(f -> f.field(SORT_BY).order(SortOrder.Desc))); + if (lastId != null) { + searchRequestBuilder + .searchAfter(sa -> sa.stringValue(lastId)); + } + if (targetFields != null) { + searchRequestBuilder + .source(sourceConfig -> sourceConfig + .filter(filter -> filter.includes(targetFields))); + } + var searchResult = client.search(searchRequestBuilder.build(), ObjectNode.class); return searchResult.hits().hits().stream() .map(Hit::source) @@ -65,7 +63,6 @@ private String getIndex(TargetType targetType) { : properties.getDigitalMediaObjectIndex(); } - private static List generateQuery(List searchParams) { var qList = new ArrayList(); for (var searchParam : searchParams) { @@ -88,12 +85,4 @@ private static List generateQuery(List searchParams) { return qList; } - private static int getOffset(int pageNumber, int pageSize) { - int offset = 0; - if (pageNumber > 1) { - offset = offset + (pageSize * (pageNumber - 1)); - } - return offset; - } - } diff --git a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java index b548667..89c993f 100644 --- a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java +++ b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java @@ -1,6 +1,8 @@ package eu.dissco.exportjob.repository; import java.io.File; +import java.time.Instant; +import java.time.format.DateTimeFormatter; import java.util.UUID; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Repository; @@ -12,6 +14,7 @@ public class S3Repository { private final S3AsyncClient s3Client; + private final DateTimeFormatter formatter; private static final String BUCKET_NAME = "dissco-data-export"; public String uploadResults(File file, UUID jobId) { @@ -20,7 +23,7 @@ public String uploadResults(File file, UUID jobId) { .uploadFile(uploadFileRequest -> uploadFileRequest .putObjectRequest(putObjectRequest -> putObjectRequest .bucket(BUCKET_NAME) - .key(jobId.toString())) + .key(getDate() + "/" + jobId.toString())) .source(file)); upload.completionFuture().join(); return s3Client.utilities().getUrl( @@ -31,5 +34,8 @@ public String uploadResults(File file, UUID jobId) { } } + private String getDate() { + return formatter.format(Instant.now()); + } } diff --git a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java index 596f7e2..fe63996 100644 --- a/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java +++ b/src/main/java/eu/dissco/exportjob/service/AbstractExportJobService.java @@ -1,5 +1,8 @@ package eu.dissco.exportjob.service; +import static eu.dissco.exportjob.service.DoiListService.ID_FIELD; + +import co.elastic.clients.elasticsearch._types.query_dsl.FieldAndFormat; import com.fasterxml.jackson.databind.JsonNode; import eu.dissco.exportjob.domain.JobRequest; import eu.dissco.exportjob.domain.JobStateEndpoint; @@ -23,13 +26,15 @@ public abstract class AbstractExportJobService { private final ExporterBackendClient exporterBackendClient; private final S3Repository s3Repository; protected static final String TEMP_FILE_NAME = "src/main/resources/tmp.csv.gz"; + protected static final String ID_FIELD = "dcterms:identifier"; + protected static final String PHYSICAL_ID_FIELD = "ods:physicalSpecimenID"; public void handleMessage(JobRequest jobRequest) throws FailedProcessingException { exporterBackendClient.updateJobState(jobRequest.jobId(), JobStateEndpoint.RUNNING); try { var uploadData = processSearchResults(jobRequest); - if (uploadData){ + if (uploadData) { var url = s3Repository.uploadResults(new File(TEMP_FILE_NAME), jobRequest.jobId()); exporterBackendClient.markJobAsComplete(jobRequest.jobId(), url); } else { @@ -42,26 +47,29 @@ public void handleMessage(JobRequest jobRequest) throws FailedProcessingExceptio } private boolean processSearchResults(JobRequest jobRequest) throws IOException { - var totalHits = elasticSearchRepository.getTotalHits(jobRequest.searchParams(), - jobRequest.targetType()); - if (totalHits > 0){ - int pageNum = 1; - var hitsProcessed = 0; - writeHeaderToFile(); - while (hitsProcessed < totalHits) { - var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), - jobRequest.targetType(), pageNum); + String lastId = null; + writeHeaderToFile(); + boolean keepSearching = true; + boolean resultsProcessed = false; + var targetFields = targetFields(); + while (keepSearching) { + var searchResult = elasticSearchRepository.getTargetObjects(jobRequest.searchParams(), + jobRequest.targetType(), lastId, targetFields); + if (searchResult.isEmpty()){ + keepSearching = false; + } else { writeResultsToFile(searchResult); - hitsProcessed = hitsProcessed + searchResult.size(); - pageNum = pageNum + 1; + lastId = searchResult.getLast().get(ID_FIELD).asText(); + resultsProcessed = true; } - return true; } - return false; + return resultsProcessed; } protected abstract void writeHeaderToFile() throws IOException; protected abstract void writeResultsToFile(List searchResult) throws IOException; + protected abstract List targetFields(); + } diff --git a/src/main/java/eu/dissco/exportjob/service/DoiListService.java b/src/main/java/eu/dissco/exportjob/service/DoiListService.java index 57a3921..1624ea7 100644 --- a/src/main/java/eu/dissco/exportjob/service/DoiListService.java +++ b/src/main/java/eu/dissco/exportjob/service/DoiListService.java @@ -17,9 +17,7 @@ @Profile(Profiles.DOI_LIST) public class DoiListService extends AbstractExportJobService { - private static final String ODS_ID = "ods:ID"; - private static final String PHYSICAL_ID = "ods:physicalSpecimenID"; - private static final byte[] HEADER = (ODS_ID + "," + PHYSICAL_ID).getBytes( + private static final byte[] HEADER = (ID_FIELD + "," + PHYSICAL_ID_FIELD).getBytes( StandardCharsets.UTF_8); public DoiListService( @@ -41,10 +39,16 @@ protected void writeResultsToFile(List searchResults) throws IOExcepti var byteOutputStream = new FileOutputStream(TEMP_FILE_NAME, true); var gzip = new GZIPOutputStream(byteOutputStream)) { for (var result : searchResults) { - var col = ("\n" + result.get(ODS_ID).asText() + "," + result.get(PHYSICAL_ID).asText()) + var col = ("\n" + result.get(ID_FIELD).asText() + "," + result.get(PHYSICAL_ID_FIELD).asText()) .getBytes(StandardCharsets.UTF_8); gzip.write(col, 0, col.length); } } } + + protected List targetFields(){ + return List.of(ID_FIELD, PHYSICAL_ID_FIELD); + } + + } diff --git a/src/main/resources/tmp.csv.gz b/src/main/resources/tmp.csv.gz index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..9056a7e9e52db33d1114d444771b4ecea3130444 100644 GIT binary patch literal 61 zcmb2|=3oGW|K6T@p1P-Wx1I9wJga@u=d_RRrIY@e+q6!s&^hPhcZO5X!}r`7-?N`f O7#T7mzZ}*C>H+|%pcj(> literal 0 HcmV?d00001 diff --git a/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java b/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java index 43ffe67..a89a748 100644 --- a/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java +++ b/src/test/java/eu/dissco/exportjob/repository/ElasticSearchRepositoryTest.java @@ -2,10 +2,13 @@ import static eu.dissco.exportjob.utils.TestUtils.DOI_2; import static eu.dissco.exportjob.utils.TestUtils.MAPPER; +import static eu.dissco.exportjob.utils.TestUtils.ORG_1; import static eu.dissco.exportjob.utils.TestUtils.ORG_2; import static eu.dissco.exportjob.utils.TestUtils.PHYS_ID_2; import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen; +import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimenReducedDoiList; import static eu.dissco.exportjob.utils.TestUtils.givenSearchParams; +import static eu.dissco.exportjob.utils.TestUtils.givenTargetFields; import static org.assertj.core.api.Assertions.assertThat; import co.elastic.clients.elasticsearch.ElasticsearchClient; @@ -99,30 +102,31 @@ void clearIndex() throws IOException { } @Test - void testGetTotalHits() throws IOException { + void testGetTargetObject() throws IOException { // Given postDigitalSpecimens( List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2))); // When - var result = elasticRepository.getTotalHits(givenSearchParams(), TargetType.DIGITAL_SPECIMEN); + var result = elasticRepository.getTargetObjects(givenSearchParams(), + TargetType.DIGITAL_SPECIMEN, null, givenTargetFields()); // Then - assertThat(result).isEqualTo(1L); + assertThat(result).isEqualTo(List.of(givenDigitalSpecimenReducedDoiList())); } @Test - void testGetTargetObject() throws IOException { + void testGetTargetObjectSecondPage() throws IOException { // Given postDigitalSpecimens( - List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_2, PHYS_ID_2))); + List.of(givenDigitalSpecimen(), givenDigitalSpecimen(DOI_2, ORG_1, PHYS_ID_2))); // When var result = elasticRepository.getTargetObjects(givenSearchParams(), - TargetType.DIGITAL_SPECIMEN, 1); + TargetType.DIGITAL_SPECIMEN, DOI_2, givenTargetFields()); // Then - assertThat(result).isEqualTo(List.of(givenDigitalSpecimen())); + assertThat(result).isEqualTo(List.of(givenDigitalSpecimenReducedDoiList())); } @Test @@ -135,7 +139,7 @@ void testGetTargetObjectEmptyOrg() throws IOException { var searchParam = List.of(new SearchParam("$['ods:organisationID']", null)); // When - var result = elasticRepository.getTargetObjects(searchParam, TargetType.DIGITAL_SPECIMEN, 1); + var result = elasticRepository.getTargetObjects(searchParam, TargetType.DIGITAL_SPECIMEN, null, null); // Then assertThat(result).isEqualTo(List.of(expected)); diff --git a/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java b/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java index c196d0c..fb609d1 100644 --- a/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java +++ b/src/test/java/eu/dissco/exportjob/service/DoiListServiceTest.java @@ -6,7 +6,6 @@ import static eu.dissco.exportjob.utils.TestUtils.givenDigitalSpecimen; import static eu.dissco.exportjob.utils.TestUtils.givenJobRequest; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; @@ -43,8 +42,8 @@ void init() { @Test void testHandleMessageNoResultsFound() throws Exception { // Given - given(elasticSearchRepository.getTotalHits(any(), any())).willReturn(0L); - + given(elasticSearchRepository.getTargetObjects(any(), any(), eq(null), any())).willReturn( + List.of()); // When service.handleMessage(givenJobRequest()); @@ -57,8 +56,7 @@ void testHandleMessageNoResultsFound() throws Exception { @Test void testHandleMessage() throws Exception { // GIven - given(elasticSearchRepository.getTotalHits(any(), any())).willReturn(1L); - given(elasticSearchRepository.getTargetObjects(any(), any(), anyInt())).willReturn( + given(elasticSearchRepository.getTargetObjects(any(), any(), eq(null), any())).willReturn( List.of(givenDigitalSpecimen())); given(s3Repository.uploadResults(any(), eq(JOB_ID))).willReturn(DOWNLOAD_LINK); diff --git a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java index 2ad6838..c787931 100644 --- a/src/test/java/eu/dissco/exportjob/utils/TestUtils.java +++ b/src/test/java/eu/dissco/exportjob/utils/TestUtils.java @@ -25,6 +25,8 @@ private TestUtils() { .setSerializationInclusion(Include.NON_NULL); public static final String DOWNLOAD_LINK = "https://aws.download/s3"; public static final String ORG_FIELD_NAME = "$['ods:organisationID']"; + public static final String ID_FIELD = "dcterms:identifier"; + public static final String PHYS_ID_FIELD = "ods:physicalSpecimenID"; public static JobRequest givenJobRequest() { return new JobRequest( @@ -45,9 +47,23 @@ public static JsonNode givenDigitalSpecimen(){ public static JsonNode givenDigitalSpecimen(String doi, String org, String physId){ return MAPPER.createObjectNode() - .put("ods:ID", doi) + .put(ID_FIELD, doi) .put("@id", doi) .put("ods:organisationID", org) - .put("ods:physicalSpecimenID", physId); + .put(PHYS_ID_FIELD, physId); + } + + public static JsonNode givenDigitalSpecimenReducedDoiList(){ + return givenDigitalSpecimenReducedDoiList(DOI_1, PHYS_ID_1); + } + + public static JsonNode givenDigitalSpecimenReducedDoiList(String doi, String physId){ + return MAPPER.createObjectNode() + .put(ID_FIELD, doi) + .put(PHYS_ID_FIELD, physId); + } + + public static List givenTargetFields(){ + return List.of(ID_FIELD, PHYS_ID_FIELD); } } From d84648c62d74053ba3de1f136ddbb3d3c7de101b Mon Sep 17 00:00:00 2001 From: southeo Date: Tue, 29 Oct 2024 09:53:10 +0100 Subject: [PATCH 12/12] code review --- .../java/eu/dissco/exportjob/repository/S3Repository.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java index 89c993f..6420315 100644 --- a/src/main/java/eu/dissco/exportjob/repository/S3Repository.java +++ b/src/main/java/eu/dissco/exportjob/repository/S3Repository.java @@ -18,18 +18,19 @@ public class S3Repository { private static final String BUCKET_NAME = "dissco-data-export"; public String uploadResults(File file, UUID jobId) { + var key = getDate() +"/" + jobId.toString(); try (var transferManager = S3TransferManager.builder().s3Client(s3Client).build()) { var upload = transferManager .uploadFile(uploadFileRequest -> uploadFileRequest .putObjectRequest(putObjectRequest -> putObjectRequest .bucket(BUCKET_NAME) - .key(getDate() + "/" + jobId.toString())) + .key(key)) .source(file)); upload.completionFuture().join(); return s3Client.utilities().getUrl( builder -> builder .bucket(BUCKET_NAME) - .key(jobId.toString())) + .key(key)) .toString(); } }