From 4dd53d08f35ee04497f9344255b381774511989b Mon Sep 17 00:00:00 2001 From: Sam Leeflang Date: Tue, 1 Nov 2022 16:40:59 +0100 Subject: [PATCH] Add batching to processing service --- pom.xml | 8 +- .../KafkaConsumerConfiguration.java | 2 + .../controller/DigitalSpecimenController.java | 10 +- .../domain/DigitalSpecimenRecordEvent.java | 9 + .../domain/ProcessResult.java | 10 + .../domain/UpdatedDigitalSpecimenTuple.java | 6 + .../property/KafkaConsumerProperties.java | 4 + .../repository/DigitalSpecimenRepository.java | 27 ++- .../repository/ElasticSearchRepository.java | 20 +- .../service/HandleService.java | 11 +- .../service/KafkaConsumerService.java | 23 +- .../service/KafkaPublisherService.java | 4 +- .../service/ProcessingService.java | 229 +++++++++++++----- .../service/HandleServiceTest.java | 85 +++++++ .../service/KafkaConsumerServiceTest.java | 153 ++++++++++++ .../service/KafkaPublisherServiceTest.java | 65 +++++ .../service/ProcessingServiceTest.java | 137 +++++++++++ .../utils/TestUtils.java | 160 ++++++++++++ 18 files changed, 867 insertions(+), 96 deletions(-) create mode 100644 src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/DigitalSpecimenRecordEvent.java create mode 100644 src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/ProcessResult.java create mode 100644 src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenTuple.java create mode 100644 src/test/java/eu/dissco/core/digitalspecimenprocessor/service/HandleServiceTest.java create mode 100644 src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerServiceTest.java create mode 100644 src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherServiceTest.java create mode 100644 src/test/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingServiceTest.java create mode 100644 src/test/java/eu/dissco/core/digitalspecimenprocessor/utils/TestUtils.java diff --git a/pom.xml b/pom.xml index 858a07a..dfef3b0 100644 --- a/pom.xml +++ b/pom.xml @@ -109,10 +109,16 @@ spring-boot-starter-test test + + org.mockito + mockito-inline + 4.8.0 + test + - clean validate + clean package org.springframework.boot diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/configuration/KafkaConsumerConfiguration.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/configuration/KafkaConsumerConfiguration.java index 629b8e1..53f65b6 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/configuration/KafkaConsumerConfiguration.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/configuration/KafkaConsumerConfiguration.java @@ -36,6 +36,7 @@ public ConsumerFactory consumerFactory() { props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getBatchSize()); return new DefaultKafkaConsumerFactory<>(props); } @@ -45,6 +46,7 @@ public ConsumerFactory consumerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); return factory; } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/controller/DigitalSpecimenController.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/controller/DigitalSpecimenController.java index 3fa96d7..ac3be38 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/controller/DigitalSpecimenController.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/controller/DigitalSpecimenController.java @@ -5,6 +5,7 @@ import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; import eu.dissco.core.digitalspecimenprocessor.exception.NoChangesFoundException; import eu.dissco.core.digitalspecimenprocessor.service.ProcessingService; +import java.util.List; import javax.xml.transform.TransformerException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -31,10 +32,13 @@ public class DigitalSpecimenController { @PreAuthorize("isAuthenticated()") @PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity upsertDigitalSpecimen(@RequestBody - DigitalSpecimenEvent event) throws TransformerException, NoChangesFoundException { + DigitalSpecimenEvent event) throws NoChangesFoundException { log.info("Received digitalSpecimen upsert: {}", event); - var result = processingService.handleMessages(event); - return ResponseEntity.status(HttpStatus.CREATED).body(result); + var result = processingService.handleMessages(List.of(event)); + if (result.isEmpty()){ + throw new NoChangesFoundException("No changes found for specimen"); + } + return ResponseEntity.status(HttpStatus.CREATED).body(result.get(0)); } @ExceptionHandler(NoChangesFoundException.class) diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/DigitalSpecimenRecordEvent.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/DigitalSpecimenRecordEvent.java new file mode 100644 index 0000000..f1325a2 --- /dev/null +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/DigitalSpecimenRecordEvent.java @@ -0,0 +1,9 @@ +package eu.dissco.core.digitalspecimenprocessor.domain; + +import java.util.List; + +public record DigitalSpecimenRecordEvent( + List enrichmentList, + DigitalSpecimenRecord digitalSpecimenRecord) { + +} diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/ProcessResult.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/ProcessResult.java new file mode 100644 index 0000000..ff35ba5 --- /dev/null +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/ProcessResult.java @@ -0,0 +1,10 @@ +package eu.dissco.core.digitalspecimenprocessor.domain; + +import java.util.List; + +public record ProcessResult( + List equalSpecimens, + List changedSpecimens, + List newSpecimens) { + +} diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenTuple.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenTuple.java new file mode 100644 index 0000000..b26e3b5 --- /dev/null +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenTuple.java @@ -0,0 +1,6 @@ +package eu.dissco.core.digitalspecimenprocessor.domain; + +public record UpdatedDigitalSpecimenTuple(DigitalSpecimenRecord currentSpecimen, + DigitalSpecimen digitalSpecimen) { + +} diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/property/KafkaConsumerProperties.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/property/KafkaConsumerProperties.java index ff771bf..17f9335 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/property/KafkaConsumerProperties.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/property/KafkaConsumerProperties.java @@ -2,6 +2,7 @@ import eu.dissco.core.digitalspecimenprocessor.Profiles; import javax.validation.constraints.NotBlank; +import javax.validation.constraints.Positive; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Profile; @@ -23,4 +24,7 @@ public class KafkaConsumerProperties { @NotBlank private String topic; + @Positive + private int batchSize = 5000; + } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepository.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepository.java index 2ed32ba..6122259 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepository.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepository.java @@ -7,10 +7,13 @@ import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; import java.time.Instant; +import java.util.Collection; +import java.util.List; import java.util.Optional; import lombok.RequiredArgsConstructor; import org.jooq.DSLContext; import org.jooq.JSONB; +import org.jooq.Query; import org.jooq.Record; import org.springframework.stereotype.Repository; @@ -55,7 +58,12 @@ private DigitalSpecimenRecord mapDigitalSpecimen(Record dbRecord) { digitalSpecimen); } - public int createDigitalSpecimenRecord(DigitalSpecimenRecord digitalSpecimenRecord) { + public int[] createDigitalSpecimenRecord(Collection digitalSpecimenRecords) { + var queries = digitalSpecimenRecords.stream().map(this::specimenToQuery).toList(); + return context.batch(queries).execute(); + } + + private Query specimenToQuery(DigitalSpecimenRecord digitalSpecimenRecord) { return context.insertInto(NEW_DIGITAL_SPECIMEN) .set(NEW_DIGITAL_SPECIMEN.ID, digitalSpecimenRecord.id()) .set(NEW_DIGITAL_SPECIMEN.TYPE, digitalSpecimenRecord.digitalSpecimen().type()) @@ -79,13 +87,22 @@ public int createDigitalSpecimenRecord(DigitalSpecimenRecord digitalSpecimenReco JSONB.valueOf(digitalSpecimenRecord.digitalSpecimen().data().toString())) .set(NEW_DIGITAL_SPECIMEN.ORIGINAL_DATA, JSONB.valueOf(digitalSpecimenRecord.digitalSpecimen().originalData().toString())) - .set(NEW_DIGITAL_SPECIMEN.DWCA_ID, digitalSpecimenRecord.digitalSpecimen().dwcaId()) - .execute(); + .set(NEW_DIGITAL_SPECIMEN.DWCA_ID, digitalSpecimenRecord.digitalSpecimen().dwcaId()); } - public int updateLastChecked(DigitalSpecimenRecord currentDigitalSpecimen) { + public int updateLastChecked(List currentDigitalSpecimen) { return context.update(NEW_DIGITAL_SPECIMEN) .set(NEW_DIGITAL_SPECIMEN.LAST_CHECKED, Instant.now()) - .where(NEW_DIGITAL_SPECIMEN.ID.eq(currentDigitalSpecimen.id())).execute(); + .where(NEW_DIGITAL_SPECIMEN.ID.in(currentDigitalSpecimen)) + .execute(); + } + + public List getDigitalSpecimens(List specimenList) { + return context.select(NEW_DIGITAL_SPECIMEN.asterisk()) + .distinctOn(NEW_DIGITAL_SPECIMEN.ID) + .from(NEW_DIGITAL_SPECIMEN) + .where(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_ID.in(specimenList)) + .orderBy(NEW_DIGITAL_SPECIMEN.ID, NEW_DIGITAL_SPECIMEN.VERSION.desc()) + .fetch(this::mapDigitalSpecimen); } } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/ElasticSearchRepository.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/ElasticSearchRepository.java index 77d75c3..aa7afb0 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/ElasticSearchRepository.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/ElasticSearchRepository.java @@ -1,10 +1,12 @@ package eu.dissco.core.digitalspecimenprocessor.repository; import co.elastic.clients.elasticsearch.ElasticsearchClient; -import co.elastic.clients.elasticsearch.core.IndexResponse; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; import eu.dissco.core.digitalspecimenprocessor.property.ElasticSearchProperties; import java.io.IOException; +import java.util.Collection; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Repository; @@ -15,11 +17,17 @@ public class ElasticSearchRepository { private final ElasticsearchClient client; private final ElasticSearchProperties properties; - public IndexResponse indexDigitalSpecimen(DigitalSpecimenRecord digitalSpecimen) { - try { - return client.index(idx -> idx.index(properties.getIndexName()).id(digitalSpecimen.id()).document(digitalSpecimen)); - } catch (IOException e) { - throw new RuntimeException(e); + public BulkResponse indexDigitalSpecimen(Collection digitalSpecimens) + throws IOException { + var bulkRequest = new BulkRequest.Builder(); + for (var digitalSpecimen : digitalSpecimens) { + bulkRequest.operations(op -> + op.index(idx -> idx + .index(properties.getIndexName()) + .id(digitalSpecimen.id()) + .document(digitalSpecimen)) + ); } + return client.bulk(bulkRequest.build()); } } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/HandleService.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/HandleService.java index 69ac200..5909e58 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/HandleService.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/HandleService.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen; import eu.dissco.core.digitalspecimenprocessor.domain.HandleAttribute; +import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenTuple; import eu.dissco.core.digitalspecimenprocessor.repository.HandleRepository; import java.io.StringWriter; import java.nio.charset.StandardCharsets; @@ -30,7 +31,7 @@ public class HandleService { private static final String PREFIX = "20.5000.1025/"; private final Random random; - private final char[] symbols = "ABCDEFGHJKLMNPQRSTUVWXYZ1234567890".toCharArray(); + private final char[] symbols = "ABCDEFGHJKLMNPQRSTVWXYZ1234567890".toCharArray(); private final char[] buffer = new char[11]; private final ObjectMapper mapper; private final DocumentBuilder documentBuilder; @@ -144,7 +145,7 @@ private int toDigit(char hexChar) { return digit; } - public void updateHandle(String id, DigitalSpecimen digitalSpecimen) { + private void updateHandle(String id, DigitalSpecimen digitalSpecimen) { var handleAttributes = updatedHandles(digitalSpecimen); var recordTimestamp = Instant.now(); repository.updateHandleAttributes(id, recordTimestamp, handleAttributes); @@ -158,4 +159,10 @@ private List updatedHandles(DigitalSpecimen digitalSpecimen) { digitalSpecimen.organizationId(), "ROR", "Needs to be fixed!"))); return handleAttributes; } + + public void updateHandles(List handleUpdates) { + for (var handleUpdate : handleUpdates) { + updateHandle(handleUpdate.currentSpecimen().id(), handleUpdate.digitalSpecimen()); + } + } } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerService.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerService.java index d1634d3..310e7a5 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerService.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerService.java @@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dissco.core.digitalspecimenprocessor.Profiles; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenEvent; -import eu.dissco.core.digitalspecimenprocessor.exception.NoChangesFoundException; -import javax.xml.transform.TransformerException; +import java.util.List; +import java.util.Objects; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Profile; @@ -23,14 +23,17 @@ public class KafkaConsumerService { private final ProcessingService processingService; @KafkaListener(topics = "${kafka.consumer.topic}") - public void getMessages(@Payload String message) - throws JsonProcessingException, TransformerException { - var event = mapper.readValue(message, DigitalSpecimenEvent.class); - try { - processingService.handleMessages(event); - } catch (NoChangesFoundException e) { - log.info(e.getMessage()); - } + public void getMessages(@Payload List messages) { + var events = messages.stream().map(message -> { + try { + return mapper.readValue(message, DigitalSpecimenEvent.class); + } catch (JsonProcessingException e) { + log.error("Failed to process message", e); + // TODO move message to DLQ + return null; + } + }).filter(Objects::nonNull).toList(); + processingService.handleMessages(events); } } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherService.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherService.java index d6a823d..3d4a4f5 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherService.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherService.java @@ -39,8 +39,8 @@ public void publishAnnotationRequestEvent(String enrichment, } } - public void publishUpdateEvent(DigitalSpecimenRecord currentDigitalSpecimen, - DigitalSpecimenRecord digitalSpecimenRecord) { + public void publishUpdateEvent(DigitalSpecimenRecord digitalSpecimenRecord, + DigitalSpecimenRecord currentDigitalSpecimen) { var jsonPatch = createJsonPatch(currentDigitalSpecimen, digitalSpecimenRecord); var event = new CreateUpdateDeleteEvent(UUID.randomUUID(), "update", "processing-service", digitalSpecimenRecord.id(), Instant.now(), jsonPatch, diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingService.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingService.java index bc5f889..b44e2db 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingService.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingService.java @@ -1,13 +1,22 @@ package eu.dissco.core.digitalspecimenprocessor.service; +import co.elastic.clients.elasticsearch.core.BulkResponse; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenEvent; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; -import eu.dissco.core.digitalspecimenprocessor.exception.NoChangesFoundException; +import eu.dissco.core.digitalspecimenprocessor.domain.ProcessResult; +import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenTuple; import eu.dissco.core.digitalspecimenprocessor.repository.DigitalSpecimenRepository; import eu.dissco.core.digitalspecimenprocessor.repository.ElasticSearchRepository; +import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.xml.transform.TransformerException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -18,66 +27,116 @@ @RequiredArgsConstructor public class ProcessingService { - private static final int SUCCESS = 1; - private final DigitalSpecimenRepository repository; private final HandleService handleService; private final ElasticSearchRepository elasticRepository; private final KafkaPublisherService kafkaService; - public DigitalSpecimenRecord handleMessages(DigitalSpecimenEvent event) - throws TransformerException, NoChangesFoundException { - var digitalSpecimen = event.digitalSpecimen(); - log.info("ds: {}", digitalSpecimen); - var currentDigitalSpecimenOptional = repository.getDigitalSpecimen( - digitalSpecimen.physicalSpecimenId()); - if (currentDigitalSpecimenOptional.isEmpty()) { - log.info("Specimen with id: {} is completely new", digitalSpecimen.physicalSpecimenId()); - return persistNewDigitalSpecimen(digitalSpecimen, event.enrichmentList()); - } else { - var currentDigitalSpecimen = currentDigitalSpecimenOptional.get(); - if (currentDigitalSpecimen.digitalSpecimen().equals(digitalSpecimen)) { - log.info("Received digital specimen is equal to digital specimen: {}", - currentDigitalSpecimen.id()); - processEqualDigitalSpecimen(currentDigitalSpecimen); - throw new NoChangesFoundException("No changes were necessary to specimen with id: " + currentDigitalSpecimen.id()); + public List handleMessages(List events) { + log.info("Processing {} digital specimen", events.size()); + var processResult = processSpecimens(events); + var results = new ArrayList(); + if (!processResult.equalSpecimens().isEmpty()) { + updateEqualSpecimen(processResult.equalSpecimens()); + } + if (!processResult.newSpecimens().isEmpty()) { + results.addAll(createNewDigitalSpecimen(processResult.newSpecimens())); + } + if (!processResult.changedSpecimens().isEmpty()) { + results.addAll(updateExistingDigitalSpecimen(processResult.changedSpecimens())); + } + return results; + } + + private ProcessResult processSpecimens(List events) { + var currentSpecimens = getCurrentSpecimen(events); + var equalSpecimens = new ArrayList(); + var changedSpecimens = new ArrayList(); + var newSpecimens = new ArrayList(); + + for (DigitalSpecimenEvent event : events) { + var digitalSpecimen = event.digitalSpecimen(); + log.debug("ds: {}", digitalSpecimen); + if (!currentSpecimens.containsKey(digitalSpecimen.physicalSpecimenId())) { + log.debug("Specimen with id: {} is completely new", digitalSpecimen.physicalSpecimenId()); + newSpecimens.add(event); } else { - log.info("Specimen with id: {} has received an update", currentDigitalSpecimen.id()); - return updateExistingDigitalSpecimen(currentDigitalSpecimen, digitalSpecimen); + var currentDigitalSpecimen = currentSpecimens.get(digitalSpecimen.physicalSpecimenId()); + if (currentDigitalSpecimen.digitalSpecimen().equals(digitalSpecimen)) { + log.debug("Received digital specimen is equal to digital specimen: {}", + currentDigitalSpecimen.id()); + equalSpecimens.add(currentDigitalSpecimen); + } else { + log.debug("Specimen with id: {} has received an update", currentDigitalSpecimen.id()); + changedSpecimens.add( + new UpdatedDigitalSpecimenTuple(currentDigitalSpecimen, digitalSpecimen)); + } } } + return new ProcessResult(equalSpecimens, changedSpecimens, newSpecimens); } - private void processEqualDigitalSpecimen(DigitalSpecimenRecord currentDigitalSpecimen) { - var result = repository.updateLastChecked(currentDigitalSpecimen); - if (result == SUCCESS) { - log.info("Successfully updated lastChecked for existing digitalSpecimen: {}", - currentDigitalSpecimen.id()); - } + private Map getCurrentSpecimen(List events) { + return repository.getDigitalSpecimens( + events.stream().map(event -> event.digitalSpecimen().physicalSpecimenId()).toList()) + .stream().collect( + Collectors.toMap( + specimenRecord -> specimenRecord.digitalSpecimen().physicalSpecimenId(), + Function.identity())); } - private DigitalSpecimenRecord updateExistingDigitalSpecimen( - DigitalSpecimenRecord currentDigitalSpecimen, - DigitalSpecimen digitalSpecimen) { - if (handleNeedsUpdate(currentDigitalSpecimen.digitalSpecimen(), digitalSpecimen)) { - handleService.updateHandle(currentDigitalSpecimen.id(), digitalSpecimen); - } - var id = currentDigitalSpecimen.id(); - var midsLevel = calculateMidsLevel(digitalSpecimen); - var version = currentDigitalSpecimen.version() + 1; - var digitalSpecimenRecord = new DigitalSpecimenRecord(id, midsLevel, version, Instant.now(), - digitalSpecimen); - var result = repository.createDigitalSpecimenRecord(digitalSpecimenRecord); - if (result == SUCCESS) { - log.info("Specimen: {} has been successfully updated in the database", id); - var indexDocument = elasticRepository.indexDigitalSpecimen(digitalSpecimenRecord); - if (indexDocument.result().jsonValue().equals("updated")) { - log.info("Specimen: {} has been successfully indexed", id); - kafkaService.publishUpdateEvent(currentDigitalSpecimen, digitalSpecimenRecord); + private void updateEqualSpecimen(List currentDigitalSpecimen) { + var currentIds = currentDigitalSpecimen.stream().map(DigitalSpecimenRecord::id).toList(); + repository.updateLastChecked(currentIds); + log.info("Successfully updated lastChecked for {} existing digitalSpecimen", + currentDigitalSpecimen.size()); + } + + private Set updateExistingDigitalSpecimen( + List updatedDigitalSpecimenTuples) { + var handleUpdates = updatedDigitalSpecimenTuples.stream().filter( + tuple -> handleNeedsUpdate(tuple.currentSpecimen().digitalSpecimen(), + tuple.digitalSpecimen())).toList(); + handleService.updateHandles(handleUpdates); + + var digitalSpecimenRecords = updatedDigitalSpecimenTuples.stream().collect(Collectors.toMap( + tuple -> new DigitalSpecimenRecord( + tuple.currentSpecimen().id(), + calculateMidsLevel(tuple.digitalSpecimen()), + tuple.currentSpecimen().version() + 1, + Instant.now(), + tuple.digitalSpecimen() + ), UpdatedDigitalSpecimenTuple::currentSpecimen)); + log.info("Persisting to db"); + repository.createDigitalSpecimenRecord(digitalSpecimenRecords.keySet()); + log.info("Persisting to elastic"); + BulkResponse bulkResponse = null; + try { + bulkResponse = elasticRepository.indexDigitalSpecimen(digitalSpecimenRecords.keySet()); + if (!bulkResponse.errors()) { + log.debug("Successfully indexed {} specimens", digitalSpecimenRecords); + digitalSpecimenRecords.forEach(kafkaService::publishUpdateEvent); + } else { + var digitalSpecimenMap = digitalSpecimenRecords.values().stream() + .collect(Collectors.toMap(DigitalSpecimenRecord::id, Function.identity())); + bulkResponse.items().forEach( + item -> { + var digitalSpecimenRecord = digitalSpecimenMap.get(item.id()); + if (item.error() != null) { + // TODO Rollback database (remove version) and move message to DLQ + digitalSpecimenRecords.remove(digitalSpecimenRecord); + } else { + kafkaService.publishUpdateEvent(digitalSpecimenRecord, + digitalSpecimenRecords.get(digitalSpecimenRecord)); + } + } + ); } + log.info("Successfully updated {} digitalSpecimen", updatedDigitalSpecimenTuples.size()); + return digitalSpecimenRecords.keySet(); + } catch (IOException e) { + throw new RuntimeException(e); } - log.info("Successfully updated digital specimen with id: {}", id); - return digitalSpecimenRecord; } private boolean handleNeedsUpdate(DigitalSpecimen currentDigitalSpecimen, @@ -86,28 +145,64 @@ private boolean handleNeedsUpdate(DigitalSpecimen currentDigitalSpecimen, !currentDigitalSpecimen.organizationId().equals(digitalSpecimen.organizationId()); } - private DigitalSpecimenRecord persistNewDigitalSpecimen(DigitalSpecimen digitalSpecimen, - List enrichmentList) - throws TransformerException { - var id = handleService.createNewHandle(digitalSpecimen); - log.info("New id has been generated: {}", id); - var midsLevel = calculateMidsLevel(digitalSpecimen); - var digitalSpecimenRecord = new DigitalSpecimenRecord(id, midsLevel, 1, Instant.now(), - digitalSpecimen); - var result = repository.createDigitalSpecimenRecord(digitalSpecimenRecord); - if (result == SUCCESS) { - log.info("Specimen: {} has been successfully committed to database", id); - var indexDocument = elasticRepository.indexDigitalSpecimen(digitalSpecimenRecord); - if (indexDocument.result().jsonValue().equals("created")) { - log.info("Specimen: {} has been successfully indexed", id); - kafkaService.publishCreateEvent(digitalSpecimenRecord); - for (var enrichment : enrichmentList) { - kafkaService.publishAnnotationRequestEvent(enrichment, digitalSpecimenRecord); - } + private Set createNewDigitalSpecimen(List events) { + var digitalSpecimenRecords = events.stream().collect(Collectors.toMap( + event -> { + try { + return new DigitalSpecimenRecord( + handleService.createNewHandle(event.digitalSpecimen()), + calculateMidsLevel(event.digitalSpecimen()), + 1, + Instant.now(), + event.digitalSpecimen() + ); + } catch (TransformerException e) { + log.error("Failed to process record with id: {}", + event.digitalSpecimen().physicalSpecimenId(), + e); + return null; + } + }, + DigitalSpecimenEvent::enrichmentList + )); + digitalSpecimenRecords.remove(null); + if (digitalSpecimenRecords.isEmpty()) { + return Collections.emptySet(); + } + repository.createDigitalSpecimenRecord(digitalSpecimenRecords.keySet()); + + try { + var bulkResponse = elasticRepository.indexDigitalSpecimen(digitalSpecimenRecords.keySet()); + if (!bulkResponse.errors()) { + log.debug("Successfully indexed {} specimens", digitalSpecimenRecords); + digitalSpecimenRecords.forEach((key, value) -> { + kafkaService.publishCreateEvent(key); + value.forEach(aas -> kafkaService.publishAnnotationRequestEvent(aas, key)); + }); + } else { + var digitalSpecimenMap = digitalSpecimenRecords.keySet().stream() + .collect(Collectors.toMap(DigitalSpecimenRecord::id, Function.identity())); + bulkResponse.items().forEach( + item -> { + var digitalSpecimenRecord = digitalSpecimenMap.get(item.id()); + if (item.error() != null) { + // TODO Rollback database and handle and move message to DLQ + digitalSpecimenRecords.remove(digitalSpecimenRecord); + } else { + kafkaService.publishCreateEvent(digitalSpecimenRecord); + digitalSpecimenRecords.get(digitalSpecimenRecord).forEach( + aas -> kafkaService.publishAnnotationRequestEvent(aas, digitalSpecimenRecord)); + } + } + ); } + log.info("Successfully created {} new digitalSpecimen", digitalSpecimenRecords.size()); + return digitalSpecimenRecords.keySet(); + } catch (IOException e) { + // TODO rollback all items from database and remove handles + throw new RuntimeException(e); } - log.info("Successfully created digital specimen with id: {}", id); - return digitalSpecimenRecord; + } private int calculateMidsLevel(DigitalSpecimen digitalSpecimen) { diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/HandleServiceTest.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/HandleServiceTest.java new file mode 100644 index 0000000..3576bbd --- /dev/null +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/HandleServiceTest.java @@ -0,0 +1,85 @@ +package eu.dissco.core.digitalspecimenprocessor.service; + +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.CREATED; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.HANDLE; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.MAPPER; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimen; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenRecord; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenUnequalDigitalSpecimenRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mockStatic; + +import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenTuple; +import eu.dissco.core.digitalspecimenprocessor.repository.HandleRepository; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Random; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.TransformerException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class HandleServiceTest { + + @Mock + private Random random; + @Mock + private HandleRepository repository; + private MockedStatic mockedStatic; + + private HandleService service; + + @BeforeEach + void setup() throws ParserConfigurationException { + var docFactory = DocumentBuilderFactory.newInstance(); + service = new HandleService(random, MAPPER, docFactory.newDocumentBuilder(), repository); + Clock clock = Clock.fixed(CREATED, ZoneOffset.UTC); + Instant instant = Instant.now(clock); + mockedStatic = mockStatic(Instant.class); + mockedStatic.when(Instant::now).thenReturn(instant); + } + + @AfterEach + void destroy() { + mockedStatic.close(); + } + + @Test + void testCreateNewHandle() throws TransformerException { + // Given + given(random.nextInt(33)).willReturn(21); + var expected = "20.5000.1025/YYY-YYY-YYY"; + + // When + var result = service.createNewHandle(givenDigitalSpecimen()); + + // Then + then(repository).should().createHandle(eq(expected), eq(CREATED), anyList()); + assertThat(result).isEqualTo(expected); + } + + @Test + void testUpdateHandle() { + // Given + + // When + service.updateHandles(List.of(new UpdatedDigitalSpecimenTuple(givenUnequalDigitalSpecimenRecord(), givenDigitalSpecimen()))); + + // Then + then(repository).should().updateHandleAttributes(eq(HANDLE), eq(CREATED), anyList()); + } + +} diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerServiceTest.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerServiceTest.java new file mode 100644 index 0000000..f1ed753 --- /dev/null +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerServiceTest.java @@ -0,0 +1,153 @@ +package eu.dissco.core.digitalspecimenprocessor.service; + +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.MAPPER; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenEvent; +import static org.mockito.BDDMockito.then; + +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class KafkaConsumerServiceTest { + + @Mock + private ProcessingService processingService; + + private KafkaConsumerService service; + + @BeforeEach + void setup() { + service = new KafkaConsumerService(MAPPER, processingService); + } + + @Test + void testGetMessages() { + // Given + var message = givenMessage(); + + // When + service.getMessages(List.of(message)); + + // Then + then(processingService).should().handleMessages(List.of(givenDigitalSpecimenEvent())); + } + + @Test + void testGetInvalidMessages() { + // Given + var message = givenInvalidMessage(); + + // When + service.getMessages(List.of(message)); + + // Then + then(processingService).should().handleMessages(List.of()); + + } + + private String givenInvalidMessage() { + return """ + { + "enrichmentList": [ + "OCR" + ], + "digitalSpecimen": { + "type": "GeologyRockSpecimen", + "physicalSpecimenId": "https://geocollections.info/specimen/23602", + "physicalSpecimenIdType": "cetaf", + "specimenName": "Biota, + "organizationId": "https://ror.org/0443cwa12", + "datasetId": null, + "physicalSpecimenCollection": null, + "sourceSystemId": "20.5000.1025/MN0-5XP-FFD", + "data": {}, + "originalData": {}, + "dwcaId": null + } + }"""; + } + + private String givenMessage() { + return """ + { + "enrichmentList": [ + "OCR" + ], + "digitalSpecimen": { + "type": "GeologyRockSpecimen", + "physicalSpecimenId": "https://geocollections.info/specimen/23602", + "physicalSpecimenIdType": "cetaf", + "specimenName": "Biota", + "organizationId": "https://ror.org/0443cwa12", + "datasetId": null, + "physicalSpecimenCollection": null, + "sourceSystemId": "20.5000.1025/MN0-5XP-FFD", + "data": { + "abcd:unitID": "152-4972", + "abcd:sourceID": "GIT", + "abcd:recordURI": "https://geocollections.info/specimen/23646", + "abcd:recordBasis": "FossilSpecimen", + "abcd:unitIDNumeric": 23646, + "abcd:dateLastEdited": "2004-06-09T10:17:54.000+00:00", + "abcd:kindOfUnit/0/value": "", + "abcd:sourceInstitutionID": "Department of Geology, TalTech", + "abcd:kindOfUnit/0/language": "en", + "abcd:gathering/country/name/value": "Estonia", + "abcd:gathering/localityText/value": "Laeva 297 borehole", + "abcd:gathering/country/iso3166Code": "EE", + "abcd:gathering/localityText/language": "en", + "abcd:gathering/altitude/measurementOrFactText/value": "39.9", + "abcd:identifications/identification/0/preferredFlag": true, + "abcd:gathering/depth/measurementOrFactAtomised/lowerValue/value": "165", + "abcd:gathering/depth/measurementOrFactAtomised/unitOfMeasurement": "m", + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/spatialDatum": "WGS84", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/0/term": "Pirgu Stage", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/1/term": "Katian", + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/latitudeDecimal": 58.489269, + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/longitudeDecimal": 26.385719, + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/0/language": "en", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/1/language": "en", + "abcd-efg:earthScienceSpecimen/unitStratigraphicDetermination/chronostratigraphicAttributions/chronostratigraphicAttribution/0/chronostratigraphicName": "Pirgu Stage", + "abcd-efg:earthScienceSpecimen/unitStratigraphicDetermination/chronostratigraphicAttributions/chronostratigraphicAttribution/0/chronoStratigraphicDivision": "Stage" + }, + "originalData": { + "abcd:unitID": "152-4972", + "abcd:sourceID": "GIT", + "abcd:unitGUID": "https://geocollections.info/specimen/23646", + "abcd:recordURI": "https://geocollections.info/specimen/23646", + "abcd:recordBasis": "FossilSpecimen", + "abcd:unitIDNumeric": 23646, + "abcd:dateLastEdited": "2004-06-09T10:17:54.000+00:00", + "abcd:kindOfUnit/0/value": "", + "abcd:sourceInstitutionID": "Department of Geology, TalTech", + "abcd:kindOfUnit/0/language": "en", + "abcd:gathering/country/name/value": "Estonia", + "abcd:gathering/localityText/value": "Laeva 297 borehole", + "abcd:gathering/country/iso3166Code": "EE", + "abcd:gathering/localityText/language": "en", + "abcd:gathering/altitude/measurementOrFactText/value": "39.9", + "abcd:identifications/identification/0/preferredFlag": true, + "abcd:gathering/depth/measurementOrFactAtomised/lowerValue/value": "165", + "abcd:gathering/depth/measurementOrFactAtomised/unitOfMeasurement": "m", + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/spatialDatum": "WGS84", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/0/term": "Pirgu Stage", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/1/term": "Katian", + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/latitudeDecimal": 58.489269, + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/longitudeDecimal": 26.385719, + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/0/language": "en", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/1/language": "en", + "abcd:identifications/identification/0/result/taxonIdentified/scientificName/fullScientificNameString": "Biota", + "abcd-efg:earthScienceSpecimen/unitStratigraphicDetermination/chronostratigraphicAttributions/chronostratigraphicAttribution/0/chronostratigraphicName": "Pirgu Stage", + "abcd-efg:earthScienceSpecimen/unitStratigraphicDetermination/chronostratigraphicAttributions/chronostratigraphicAttribution/0/chronoStratigraphicDivision": "Stage" + }, + "dwcaId": null + } + }"""; + } + + +} diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherServiceTest.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherServiceTest.java new file mode 100644 index 0000000..e2763f4 --- /dev/null +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherServiceTest.java @@ -0,0 +1,65 @@ +package eu.dissco.core.digitalspecimenprocessor.service; + +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.AAS; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.MAPPER; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenRecord; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenUnequalDigitalSpecimenRecord; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.kafka.core.KafkaTemplate; + +@ExtendWith(MockitoExtension.class) +class KafkaPublisherServiceTest { + + @Mock + private KafkaTemplate kafkaTemplate; + + private KafkaPublisherService service; + + + @BeforeEach + void setup() { + service = new KafkaPublisherService(MAPPER, kafkaTemplate); + } + + @Test + void testPublishCreateEvent() { + // Given + + // When + service.publishCreateEvent(givenDigitalSpecimenRecord()); + + // Then + then(kafkaTemplate).should().send(eq("createUpdateDeleteTopic"), anyString()); + } + + @Test + void testPublishAnnotationRequestEvent() { + // Given + + // When + service.publishAnnotationRequestEvent(AAS, givenDigitalSpecimenRecord()); + + // Then + then(kafkaTemplate).should().send(eq(AAS), anyString()); + } + + @Test + void testPublishUpdateEvent() { + // Given + + // When + service.publishUpdateEvent(givenDigitalSpecimenRecord(2), givenUnequalDigitalSpecimenRecord()); + + // Then + then(kafkaTemplate).should().send(eq("createUpdateDeleteTopic"), anyString()); + } + +} diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingServiceTest.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingServiceTest.java new file mode 100644 index 0000000..fd22c42 --- /dev/null +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingServiceTest.java @@ -0,0 +1,137 @@ +package eu.dissco.core.digitalspecimenprocessor.service; + +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.AAS; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.CREATED; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.HANDLE; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.PHYSICAL_SPECIMEN_ID; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimen; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenEvent; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenRecord; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenUnequalDigitalSpecimenRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mockStatic; + +import co.elastic.clients.elasticsearch.core.BulkResponse; +import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; +import eu.dissco.core.digitalspecimenprocessor.repository.DigitalSpecimenRepository; +import eu.dissco.core.digitalspecimenprocessor.repository.ElasticSearchRepository; +import java.io.IOException; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Set; +import javax.xml.transform.TransformerException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ProcessingServiceTest { + + @Mock + private DigitalSpecimenRepository repository; + @Mock + private HandleService handleService; + @Mock + private ElasticSearchRepository elasticRepository; + @Mock + private KafkaPublisherService kafkaService; + @Mock + private BulkResponse bulkResponse; + private MockedStatic mockedStatic; + + private ProcessingService service; + + @BeforeEach + void setup() { + service = new ProcessingService(repository, handleService, elasticRepository, kafkaService); + Clock clock = Clock.fixed(CREATED, ZoneOffset.UTC); + Instant instant = Instant.now(clock); + mockedStatic = mockStatic(Instant.class); + mockedStatic.when(Instant::now).thenReturn(instant); + } + + @AfterEach + void destroy() { + mockedStatic.close(); + } + + @Test + void testEqualSpecimen() { + // Given + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn( + List.of(givenDigitalSpecimenRecord())); + + // When + List result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + then(repository).should().updateLastChecked(List.of(HANDLE)); + assertThat(result).isEmpty(); + } + + @Test + void testUnequalSpecimen() throws IOException { + // Given + var expected = Set.of(givenDigitalSpecimenRecord(2)); + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn( + List.of(givenUnequalDigitalSpecimenRecord())); + given(bulkResponse.errors()).willReturn(false); + given( + elasticRepository.indexDigitalSpecimen(expected)).willReturn( + bulkResponse); + + // When + var result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + then(repository).should().createDigitalSpecimenRecord(expected); + then(kafkaService).should() + .publishUpdateEvent(givenDigitalSpecimenRecord(2), givenUnequalDigitalSpecimenRecord()); + assertThat(result).isEqualTo(List.of(givenDigitalSpecimenRecord(2))); + } + + @Test + void testNewSpecimen() throws TransformerException, IOException { + // Given + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn(List.of()); + given(handleService.createNewHandle(givenDigitalSpecimen())).willReturn(HANDLE); + given(bulkResponse.errors()).willReturn(false); + given( + elasticRepository.indexDigitalSpecimen(Set.of(givenDigitalSpecimenRecord()))).willReturn( + bulkResponse); + + // When + var result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + then(repository).should().createDigitalSpecimenRecord(Set.of(givenDigitalSpecimenRecord())); + then(kafkaService).should().publishCreateEvent(givenDigitalSpecimenRecord()); + then(kafkaService).should().publishAnnotationRequestEvent(AAS, givenDigitalSpecimenRecord()); + assertThat(result).isEqualTo(List.of(givenDigitalSpecimenRecord())); + } + + @Test + void testNewSpecimenError() throws TransformerException { + // Given + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn(List.of()); + given(handleService.createNewHandle(givenDigitalSpecimen())).willThrow( + TransformerException.class); + + // When + var result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + then(repository).shouldHaveNoMoreInteractions(); + then(elasticRepository).shouldHaveNoInteractions(); + assertThat(result).isEmpty(); + } + +} diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/utils/TestUtils.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/utils/TestUtils.java new file mode 100644 index 0000000..e5d2463 --- /dev/null +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/utils/TestUtils.java @@ -0,0 +1,160 @@ +package eu.dissco.core.digitalspecimenprocessor.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen; +import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenEvent; +import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; +import java.time.Instant; +import java.util.List; + +public class TestUtils { + + public static ObjectMapper MAPPER = new ObjectMapper().findAndRegisterModules(); + public static String HANDLE = "20.5000.1025/V1Z-176-LL4"; + public static int MIDS_LEVEL = 1; + public static int VERSION = 1; + public static Instant CREATED = Instant.parse("2022-11-01T09:59:24.00Z"); + public static String AAS = "OCR"; + public static String TYPE = "GeologyRockSpecimen"; + public static String PHYSICAL_SPECIMEN_ID = "https://geocollections.info/specimen/23602"; + public static String PHYSICAL_SPECIMEN_TYPE = "cetaf"; + public static String SPECIMEN_NAME = "Biota"; + public static String ORGANIZATION_ID = "https://ror.org/0443cwa12"; + public static String DATASET_ID = null; + public static String PHYSICAL_SPECIMEN_COLLECTION = null; + public static String SOURCE_SYSTEM_ID = "20.5000.1025/MN0-5XP-FFD"; + public static JsonNode DATA = generateSpecimenData(); + public static JsonNode ORIGINAL_DATA = generateSpecimenOriginalData(); + public static String DWCA_ID = null; + + private static JsonNode generateSpecimenOriginalData() { + try { + return MAPPER.readTree( + """ + {"abcd:unitID": "152-4972", + "abcd:sourceID": "GIT", + "abcd:unitGUID": "https://geocollections.info/specimen/23646", + "abcd:recordURI": "https://geocollections.info/specimen/23646", + "abcd:recordBasis": "FossilSpecimen", + "abcd:unitIDNumeric": 23646, + "abcd:dateLastEdited": "2004-06-09T10:17:54.000+00:00", + "abcd:kindOfUnit/0/value": "", + "abcd:sourceInstitutionID": "Department of Geology, TalTech", + "abcd:kindOfUnit/0/language": "en", + "abcd:gathering/country/name/value": "Estonia", + "abcd:gathering/localityText/value": "Laeva 297 borehole", + "abcd:gathering/country/iso3166Code": "EE", + "abcd:gathering/localityText/language": "en", + "abcd:gathering/altitude/measurementOrFactText/value": "39.9", + "abcd:identifications/identification/0/preferredFlag": true, + "abcd:gathering/depth/measurementOrFactAtomised/lowerValue/value": "165", + "abcd:gathering/depth/measurementOrFactAtomised/unitOfMeasurement": "m", + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/spatialDatum": "WGS84", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/0/term": "Pirgu Stage", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/1/term": "Katian", + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/latitudeDecimal": 58.489269, + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/longitudeDecimal": 26.385719, + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/0/language": "en", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/1/language": "en", + "abcd:identifications/identification/0/result/taxonIdentified/scientificName/fullScientificNameString": "Biota", + "abcd-efg:earthScienceSpecimen/unitStratigraphicDetermination/chronostratigraphicAttributions/chronostratigraphicAttribution/0/chronostratigraphicName": "Pirgu Stage", + "abcd-efg:earthScienceSpecimen/unitStratigraphicDetermination/chronostratigraphicAttributions/chronostratigraphicAttribution/0/chronoStratigraphicDivision": "Stage" + }""" + ); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static JsonNode generateSpecimenData() { + try { + return MAPPER.readTree( + """ + { + "abcd:unitID": "152-4972", + "abcd:sourceID": "GIT", + "abcd:recordURI": "https://geocollections.info/specimen/23646", + "abcd:recordBasis": "FossilSpecimen", + "abcd:unitIDNumeric": 23646, + "abcd:dateLastEdited": "2004-06-09T10:17:54.000+00:00", + "abcd:kindOfUnit/0/value": "", + "abcd:sourceInstitutionID": "Department of Geology, TalTech", + "abcd:kindOfUnit/0/language": "en", + "abcd:gathering/country/name/value": "Estonia", + "abcd:gathering/localityText/value": "Laeva 297 borehole", + "abcd:gathering/country/iso3166Code": "EE", + "abcd:gathering/localityText/language": "en", + "abcd:gathering/altitude/measurementOrFactText/value": "39.9", + "abcd:identifications/identification/0/preferredFlag": true, + "abcd:gathering/depth/measurementOrFactAtomised/lowerValue/value": "165", + "abcd:gathering/depth/measurementOrFactAtomised/unitOfMeasurement": "m", + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/spatialDatum": "WGS84", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/0/term": "Pirgu Stage", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/1/term": "Katian", + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/latitudeDecimal": 58.489269, + "abcd:gathering/siteCoordinateSets/siteCoordinates/0/coordinatesLatLong/longitudeDecimal": 26.385719, + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/0/language": "en", + "abcd:gathering/stratigraphy/chronostratigraphicTerms/chronostratigraphicTerm/1/language": "en", + "abcd-efg:earthScienceSpecimen/unitStratigraphicDetermination/chronostratigraphicAttributions/chronostratigraphicAttribution/0/chronostratigraphicName": "Pirgu Stage", + "abcd-efg:earthScienceSpecimen/unitStratigraphicDetermination/chronostratigraphicAttributions/chronostratigraphicAttribution/0/chronoStratigraphicDivision": "Stage" + }""" + ); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static DigitalSpecimenRecord givenDigitalSpecimenRecord(int version) { + return new DigitalSpecimenRecord( + HANDLE, + MIDS_LEVEL, + version, + CREATED, + givenDigitalSpecimen() + ); + } + + public static DigitalSpecimenRecord givenDigitalSpecimenRecord() { + return givenDigitalSpecimenRecord(VERSION); + } + + public static DigitalSpecimenRecord givenUnequalDigitalSpecimenRecord() { + return new DigitalSpecimenRecord( + HANDLE, + MIDS_LEVEL, + VERSION, + CREATED, + givenDigitalSpecimen("Another SpecimenName") + ); + } + + public static DigitalSpecimenEvent givenDigitalSpecimenEvent() { + return new DigitalSpecimenEvent( + List.of(AAS), + givenDigitalSpecimen() + ); + } + + public static DigitalSpecimen givenDigitalSpecimen() { + return givenDigitalSpecimen(SPECIMEN_NAME); + } + + public static DigitalSpecimen givenDigitalSpecimen(String specimenName) { + return new DigitalSpecimen( + TYPE, + PHYSICAL_SPECIMEN_ID, + PHYSICAL_SPECIMEN_TYPE, + specimenName, + ORGANIZATION_ID, + DATASET_ID, + PHYSICAL_SPECIMEN_COLLECTION, + SOURCE_SYSTEM_ID, + DATA, + ORIGINAL_DATA, + DWCA_ID + ); + } + +}