diff --git a/pom.xml b/pom.xml index ca4d724..e9f2f17 100644 --- a/pom.xml +++ b/pom.xml @@ -146,7 +146,7 @@ - clean package + -B verify org.springframework.boot diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/configuration/ApplicationConfiguration.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/configuration/ApplicationConfiguration.java index 085980a..753e146 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/configuration/ApplicationConfiguration.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/configuration/ApplicationConfiguration.java @@ -2,9 +2,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Random; +import javax.xml.XMLConstants; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.TransformerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -24,7 +26,16 @@ public Random random() { @Bean public DocumentBuilder documentBuilder() throws ParserConfigurationException { var docFactory = DocumentBuilderFactory.newInstance(); + docFactory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); return docFactory.newDocumentBuilder(); } + @Bean + public TransformerFactory transformerFactory() { + var factory = TransformerFactory.newInstance(); + factory.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, ""); + factory.setAttribute(XMLConstants.ACCESS_EXTERNAL_STYLESHEET, ""); + return factory; + } + } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/HandleAttribute.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/HandleAttribute.java index ea275d9..1fcfc2e 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/HandleAttribute.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/HandleAttribute.java @@ -1,5 +1,36 @@ package eu.dissco.core.digitalspecimenprocessor.domain; +import java.util.Arrays; +import java.util.Objects; + public record HandleAttribute(int index, String type, byte[] data) { + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HandleAttribute that = (HandleAttribute) o; + return index == that.index && Objects.equals(type, that.type) + && Arrays.equals(data, that.data); + } + + @Override + public int hashCode() { + int result = Objects.hash(index, type); + result = 31 * result + Arrays.hashCode(data); + return result; + } + + @Override + public String toString() { + return "HandleAttribute{" + + "index=" + index + + ", type='" + type + '\'' + + ", data=" + Arrays.toString(data) + + '}'; + } } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenRecord.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenRecord.java new file mode 100644 index 0000000..991e26f --- /dev/null +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenRecord.java @@ -0,0 +1,9 @@ +package eu.dissco.core.digitalspecimenprocessor.domain; + +import java.util.List; + +public record UpdatedDigitalSpecimenRecord(DigitalSpecimenRecord digitalSpecimenRecord, + List enrichment, + DigitalSpecimenRecord currentDigitalSpecimen) { + +} diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenTuple.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenTuple.java index b26e3b5..8225be7 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenTuple.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/domain/UpdatedDigitalSpecimenTuple.java @@ -1,6 +1,6 @@ package eu.dissco.core.digitalspecimenprocessor.domain; public record UpdatedDigitalSpecimenTuple(DigitalSpecimenRecord currentSpecimen, - DigitalSpecimen digitalSpecimen) { + DigitalSpecimenEvent digitalSpecimenEvent) { } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/exception/DisscoJsonBMappingException.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/exception/DisscoJsonBMappingException.java new file mode 100644 index 0000000..bbbd0f1 --- /dev/null +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/exception/DisscoJsonBMappingException.java @@ -0,0 +1,10 @@ +package eu.dissco.core.digitalspecimenprocessor.exception; + +import org.springframework.dao.DataAccessException; + +public class DisscoJsonBMappingException extends DataAccessException { + + public DisscoJsonBMappingException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/exception/DisscoRepositoryException.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/exception/DisscoRepositoryException.java new file mode 100644 index 0000000..284e55a --- /dev/null +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/exception/DisscoRepositoryException.java @@ -0,0 +1,12 @@ +package eu.dissco.core.digitalspecimenprocessor.exception; + +public class DisscoRepositoryException extends Exception{ + + public DisscoRepositoryException(String message) { + super(message); + } + + public DisscoRepositoryException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepository.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepository.java index 67d01dc..a749eb0 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepository.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepository.java @@ -3,9 +3,12 @@ import static eu.dissco.core.digitalspecimenprocessor.database.jooq.Tables.NEW_DIGITAL_SPECIMEN; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; +import eu.dissco.core.digitalspecimenprocessor.exception.DisscoJsonBMappingException; +import eu.dissco.core.digitalspecimenprocessor.exception.DisscoRepositoryException; import java.time.Instant; import java.util.Collection; import java.util.List; @@ -14,6 +17,7 @@ import org.jooq.JSONB; import org.jooq.Query; import org.jooq.Record; +import org.jooq.exception.DataAccessException; import org.springframework.stereotype.Repository; @Repository @@ -25,21 +29,18 @@ public class DigitalSpecimenRepository { private DigitalSpecimenRecord mapDigitalSpecimen(Record dbRecord) { DigitalSpecimen digitalSpecimen = null; - try { - digitalSpecimen = new DigitalSpecimen(dbRecord.get(NEW_DIGITAL_SPECIMEN.TYPE), - dbRecord.get(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_ID), - dbRecord.get(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_TYPE), - dbRecord.get(NEW_DIGITAL_SPECIMEN.SPECIMEN_NAME), - dbRecord.get(NEW_DIGITAL_SPECIMEN.ORGANIZATION_ID), - dbRecord.get(NEW_DIGITAL_SPECIMEN.DATASET), - dbRecord.get(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_COLLECTION), - dbRecord.get(NEW_DIGITAL_SPECIMEN.SOURCE_SYSTEM_ID), - mapper.readTree(dbRecord.get(NEW_DIGITAL_SPECIMEN.DATA).data()), - mapper.readTree(dbRecord.get(NEW_DIGITAL_SPECIMEN.ORIGINAL_DATA).data()), - dbRecord.get(NEW_DIGITAL_SPECIMEN.DWCA_ID)); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + digitalSpecimen = new DigitalSpecimen(dbRecord.get(NEW_DIGITAL_SPECIMEN.TYPE), + dbRecord.get(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_ID), + dbRecord.get(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_TYPE), + dbRecord.get(NEW_DIGITAL_SPECIMEN.SPECIMEN_NAME), + dbRecord.get(NEW_DIGITAL_SPECIMEN.ORGANIZATION_ID), + dbRecord.get(NEW_DIGITAL_SPECIMEN.DATASET), + dbRecord.get(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_COLLECTION), + dbRecord.get(NEW_DIGITAL_SPECIMEN.SOURCE_SYSTEM_ID), + mapToJson(dbRecord.get(NEW_DIGITAL_SPECIMEN.DATA)), + mapToJson(dbRecord.get(NEW_DIGITAL_SPECIMEN.ORIGINAL_DATA)), + dbRecord.get(NEW_DIGITAL_SPECIMEN.DWCA_ID)); + return new DigitalSpecimenRecord( dbRecord.get(NEW_DIGITAL_SPECIMEN.ID), dbRecord.get(NEW_DIGITAL_SPECIMEN.MIDSLEVEL), @@ -48,6 +49,16 @@ private DigitalSpecimenRecord mapDigitalSpecimen(Record dbRecord) { digitalSpecimen); } + private JsonNode mapToJson(JSONB jsonb) { + try { + return mapper.readTree(jsonb.data()); + } catch (JsonProcessingException e) { + throw new DisscoJsonBMappingException( + "Failed to parse jsonb field to json: " + jsonb.data(), e); + } + } + + public int[] createDigitalSpecimenRecord( Collection digitalSpecimenRecords) { var queries = digitalSpecimenRecords.stream().map(this::specimenToQuery).toList(); @@ -89,12 +100,31 @@ public int updateLastChecked(List currentDigitalSpecimen) { .execute(); } - public List getDigitalSpecimens(List specimenList) { - return context.select(NEW_DIGITAL_SPECIMEN.asterisk()) - .distinctOn(NEW_DIGITAL_SPECIMEN.ID) - .from(NEW_DIGITAL_SPECIMEN) - .where(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_ID.in(specimenList)) - .orderBy(NEW_DIGITAL_SPECIMEN.ID, NEW_DIGITAL_SPECIMEN.VERSION.desc()) - .fetch(this::mapDigitalSpecimen); + public List getDigitalSpecimens(List specimenList) + throws DisscoRepositoryException { + try { + return context.select(NEW_DIGITAL_SPECIMEN.asterisk()) + .distinctOn(NEW_DIGITAL_SPECIMEN.ID) + .from(NEW_DIGITAL_SPECIMEN) + .where(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_ID.in(specimenList)) + .orderBy(NEW_DIGITAL_SPECIMEN.ID, NEW_DIGITAL_SPECIMEN.VERSION.desc()) + .fetch(this::mapDigitalSpecimen); + } catch (DataAccessException ex) { + throw new DisscoRepositoryException( + "Failed to get specimen from repository: " + specimenList); + } + } + + public void rollbackSpecimen(String handle) { + context.delete(NEW_DIGITAL_SPECIMEN) + .where(NEW_DIGITAL_SPECIMEN.ID.eq(handle)) + .execute(); + } + + public void deleteVersion(DigitalSpecimenRecord digitalSpecimenRecord) { + context.delete(NEW_DIGITAL_SPECIMEN) + .where(NEW_DIGITAL_SPECIMEN.ID.eq(digitalSpecimenRecord.id())) + .and(NEW_DIGITAL_SPECIMEN.VERSION.eq(digitalSpecimenRecord.version())) + .execute(); } } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/ElasticSearchRepository.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/ElasticSearchRepository.java index aa7afb0..bcb99ee 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/ElasticSearchRepository.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/ElasticSearchRepository.java @@ -3,6 +3,7 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.DeleteResponse; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; import eu.dissco.core.digitalspecimenprocessor.property.ElasticSearchProperties; import java.io.IOException; @@ -30,4 +31,14 @@ public BulkResponse indexDigitalSpecimen(Collection digit } return client.bulk(bulkRequest.build()); } + + public DeleteResponse rollbackSpecimen(DigitalSpecimenRecord digitalSpecimenRecord) + throws IOException { + return client.delete(d -> d.index(properties.getIndexName()).id(digitalSpecimenRecord.id())); + } + + public void rollbackVersion(DigitalSpecimenRecord currentDigitalSpecimen) throws IOException { + client.index(i -> i.index(properties.getIndexName()).id(currentDigitalSpecimen.id()) + .document(currentDigitalSpecimen)); + } } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/HandleRepository.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/HandleRepository.java index ccf358d..9a0d9bd 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/HandleRepository.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/repository/HandleRepository.java @@ -39,7 +39,7 @@ public void createHandle(String handle, Instant recordTimestamp, } public void updateHandleAttributes(String id, Instant recordTimestamp, - List handleAttributes) { + List handleAttributes, boolean versionIncrement) { var queryList = new ArrayList(); for (var handleAttribute : handleAttributes) { var query = context.update(HANDLES) @@ -49,11 +49,11 @@ public void updateHandleAttributes(String id, Instant recordTimestamp, .and(HANDLES.IDX.eq(handleAttribute.index())); queryList.add(query); } - queryList.add(versionIncrement(id, recordTimestamp)); + queryList.add(versionIncrement(id, recordTimestamp, versionIncrement)); context.batch(queryList).execute(); } - private Query versionIncrement(String pid, Instant recordTimestamp) { + private Query versionIncrement(String pid, Instant recordTimestamp, boolean versionIncrement) { var currentVersion = Integer.parseInt(context.select(HANDLES.DATA) .from(HANDLES) @@ -61,7 +61,13 @@ private Query versionIncrement(String pid, Instant recordTimestamp) { StandardCharsets.UTF_8))) .and(HANDLES.TYPE.eq("issueNumber".getBytes(StandardCharsets.UTF_8))) .fetchOne(dbRecord -> new String(dbRecord.value1()))); - var version = currentVersion + 1; + int version; + if (versionIncrement){ + version = currentVersion + 1; + } else { + version = currentVersion - 1; + } + return context.update(HANDLES) .set(HANDLES.DATA, String.valueOf(version).getBytes(StandardCharsets.UTF_8)) .set(HANDLES.TIMESTAMP, recordTimestamp.getEpochSecond()) @@ -69,4 +75,11 @@ private Query versionIncrement(String pid, Instant recordTimestamp) { StandardCharsets.UTF_8))) .and(HANDLES.TYPE.eq("issueNumber".getBytes(StandardCharsets.UTF_8))); } + + public void rollbackHandleCreation(String id) { + context.delete(HANDLES) + .where(HANDLES.HANDLE.eq(id.getBytes(StandardCharsets.UTF_8))) + .execute(); + } + } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/HandleService.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/HandleService.java index 5909e58..b98d964 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/HandleService.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/HandleService.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen; +import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; import eu.dissco.core.digitalspecimenprocessor.domain.HandleAttribute; import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenTuple; import eu.dissco.core.digitalspecimenprocessor.repository.HandleRepository; @@ -30,12 +31,18 @@ public class HandleService { private static final String PREFIX = "20.5000.1025/"; + private static final String HANDLE = "Handle"; + private static final String DIGITAL_OBJECT_SUBTYPE = "digitalObjectSubtype"; + private static final String SPECIMEN_HOST = "specimenHost"; + private static final String TO_BE_FIXED = "Needs to be fixed!"; + private static final String DUMMY_HANDLE = "http://hdl.handle.net/21..."; private final Random random; private final char[] symbols = "ABCDEFGHJKLMNPQRSTVWXYZ1234567890".toCharArray(); private final char[] buffer = new char[11]; private final ObjectMapper mapper; private final DocumentBuilder documentBuilder; private final HandleRepository repository; + private final TransformerFactory transformerFactory; public String createNewHandle(DigitalSpecimen digitalSpecimen) throws TransformerException { @@ -56,9 +63,9 @@ private List fillPidRecord(DigitalSpecimen digitalSpecimen, Str handleAttributes.add(new HandleAttribute(2, "pidIssuer", createPidReference("https://doi.org/10.22/10.22/2AA-GAA-E29", "DOI", "RA Issuing DOI"))); handleAttributes.add(new HandleAttribute(3, "digitalObjectType", - createPidReference("http://hdl.handle.net/21...", "Handle", "Digital Specimen"))); - handleAttributes.add(new HandleAttribute(4, "digitalObjectSubtype", - createPidReference("https://hdl.handle.net/21...", "Handle", digitalSpecimen.type()))); + createPidReference(DUMMY_HANDLE, HANDLE, "Digital Specimen"))); + handleAttributes.add(new HandleAttribute(4, DIGITAL_OBJECT_SUBTYPE, + createPidReference("https://hdl.handle.net/21...", HANDLE, digitalSpecimen.type()))); handleAttributes.add(new HandleAttribute(5, "10320/loc", createLocations(handle))); handleAttributes.add(new HandleAttribute(6, "issueDate", createIssueDate(recordTimestamp))); handleAttributes.add( @@ -70,8 +77,8 @@ private List fillPidRecord(DigitalSpecimen digitalSpecimen, Str "https://creativecommons.org/publicdomain/zero/1.0/".getBytes(StandardCharsets.UTF_8))); handleAttributes.add(new HandleAttribute(14, "digitalOrPhysical", "physical".getBytes( StandardCharsets.UTF_8))); - handleAttributes.add(new HandleAttribute(15, "specimenHost", createPidReference( - digitalSpecimen.organizationId(), "ROR", "Needs to be fixed!"))); + handleAttributes.add(new HandleAttribute(15, SPECIMEN_HOST, createPidReference( + digitalSpecimen.organizationId(), "ROR", TO_BE_FIXED))); handleAttributes.add(new HandleAttribute(100, "HS_ADMIN", decodeAdmin())); return handleAttributes; } @@ -94,8 +101,7 @@ private byte[] createLocations(String handle) throws TransformerException { } private String documentToString(Document document) throws TransformerException { - var tf = TransformerFactory.newInstance(); - var transformer = tf.newTransformer(); + var transformer = transformerFactory.newTransformer(); transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes"); StringWriter writer = new StringWriter(); transformer.transform(new DOMSource(document), new StreamResult(writer)); @@ -141,28 +147,38 @@ private byte hexToByte(String hexString) { } private int toDigit(char hexChar) { - int digit = Character.digit(hexChar, 16); - return digit; + return Character.digit(hexChar, 16); } private void updateHandle(String id, DigitalSpecimen digitalSpecimen) { var handleAttributes = updatedHandles(digitalSpecimen); var recordTimestamp = Instant.now(); - repository.updateHandleAttributes(id, recordTimestamp, handleAttributes); + repository.updateHandleAttributes(id, recordTimestamp, handleAttributes, true); } private List updatedHandles(DigitalSpecimen digitalSpecimen) { var handleAttributes = new ArrayList(); - handleAttributes.add(new HandleAttribute(4, "digitalObjectSubtype", - createPidReference("http://hdl.handle.net/21...", "Handle", digitalSpecimen.type()))); - handleAttributes.add(new HandleAttribute(15, "specimenHost", createPidReference( - digitalSpecimen.organizationId(), "ROR", "Needs to be fixed!"))); + handleAttributes.add(new HandleAttribute(4, DIGITAL_OBJECT_SUBTYPE, + createPidReference(DUMMY_HANDLE, HANDLE, digitalSpecimen.type()))); + handleAttributes.add(new HandleAttribute(15, SPECIMEN_HOST, createPidReference( + digitalSpecimen.organizationId(), "ROR", TO_BE_FIXED))); return handleAttributes; } public void updateHandles(List handleUpdates) { for (var handleUpdate : handleUpdates) { - updateHandle(handleUpdate.currentSpecimen().id(), handleUpdate.digitalSpecimen()); + updateHandle(handleUpdate.currentSpecimen().id(), handleUpdate.digitalSpecimenEvent() + .digitalSpecimen()); } } + + public void rollbackHandleCreation(DigitalSpecimenRecord digitalSpecimenRecord) { + repository.rollbackHandleCreation(digitalSpecimenRecord.id()); + } + + public void deleteVersion(DigitalSpecimenRecord currentDigitalSpecimen) { + var handleAttributes = updatedHandles(currentDigitalSpecimen.digitalSpecimen()); + repository.updateHandleAttributes(currentDigitalSpecimen.id(), Instant.now(), handleAttributes, + false); + } } diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerService.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerService.java index 310e7a5..8b4fe45 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerService.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerService.java @@ -21,6 +21,7 @@ public class KafkaConsumerService { private final ObjectMapper mapper; private final ProcessingService processingService; + private final KafkaPublisherService publisherService; @KafkaListener(topics = "${kafka.consumer.topic}") public void getMessages(@Payload List messages) { @@ -28,8 +29,8 @@ public void getMessages(@Payload List messages) { try { return mapper.readValue(message, DigitalSpecimenEvent.class); } catch (JsonProcessingException e) { - log.error("Failed to process message", e); - // TODO move message to DLQ + log.error("Moving message to DLQ, failed to parse event message", e); + publisherService.deadLetterRaw(message); return null; } }).filter(Objects::nonNull).toList(); diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherService.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherService.java index 3d4a4f5..94987d9 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherService.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherService.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.fge.jsonpatch.diff.JsonDiff; import eu.dissco.core.digitalspecimenprocessor.domain.CreateUpdateDeleteEvent; +import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenEvent; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; import java.time.Instant; import java.util.UUID; @@ -19,37 +20,39 @@ public class KafkaPublisherService { private final ObjectMapper mapper; private final KafkaTemplate kafkaTemplate; - public void publishCreateEvent(DigitalSpecimenRecord digitalSpecimenRecord) { - var event = new CreateUpdateDeleteEvent(UUID.randomUUID(), "create", "digital-specimen-processing-service", + public void publishCreateEvent(DigitalSpecimenRecord digitalSpecimenRecord) + throws JsonProcessingException { + var event = new CreateUpdateDeleteEvent(UUID.randomUUID(), "create", + "digital-specimen-processing-service", digitalSpecimenRecord.id(), Instant.now(), mapper.valueToTree(digitalSpecimenRecord), "Specimen newly created"); - try { - kafkaTemplate.send("createUpdateDeleteTopic", mapper.writeValueAsString(event)); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + kafkaTemplate.send("createUpdateDeleteTopic", mapper.writeValueAsString(event)); } public void publishAnnotationRequestEvent(String enrichment, - DigitalSpecimenRecord digitalSpecimenRecord) { - try { - kafkaTemplate.send(enrichment, mapper.writeValueAsString(digitalSpecimenRecord)); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + DigitalSpecimenRecord digitalSpecimenRecord) throws JsonProcessingException { + kafkaTemplate.send(enrichment, mapper.writeValueAsString(digitalSpecimenRecord)); } public void publishUpdateEvent(DigitalSpecimenRecord digitalSpecimenRecord, - DigitalSpecimenRecord currentDigitalSpecimen) { + DigitalSpecimenRecord currentDigitalSpecimen) throws JsonProcessingException { var jsonPatch = createJsonPatch(currentDigitalSpecimen, digitalSpecimenRecord); var event = new CreateUpdateDeleteEvent(UUID.randomUUID(), "update", "processing-service", digitalSpecimenRecord.id(), Instant.now(), jsonPatch, "Specimen has been updated"); - try { - kafkaTemplate.send("createUpdateDeleteTopic", mapper.writeValueAsString(event)); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + kafkaTemplate.send("createUpdateDeleteTopic", mapper.writeValueAsString(event)); + } + + public void republishEvent(DigitalSpecimenEvent event) throws JsonProcessingException { + kafkaTemplate.send("digital-specimen", mapper.writeValueAsString(event)); + } + + public void deadLetterEvent(DigitalSpecimenEvent event) throws JsonProcessingException { + kafkaTemplate.send("digital-specimen-dlq", mapper.writeValueAsString(event)); + } + + public void deadLetterRaw(String event) { + kafkaTemplate.send("digital-specimen-dlq", event); } private JsonNode createJsonPatch(DigitalSpecimenRecord currentDigitalSpecimen, diff --git a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingService.java b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingService.java index 50b3163..b6b85e8 100644 --- a/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingService.java +++ b/src/main/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingService.java @@ -1,19 +1,24 @@ package eu.dissco.core.digitalspecimenprocessor.service; import co.elastic.clients.elasticsearch.core.BulkResponse; +import com.fasterxml.jackson.core.JsonProcessingException; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenEvent; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; import eu.dissco.core.digitalspecimenprocessor.domain.ProcessResult; +import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenRecord; import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenTuple; +import eu.dissco.core.digitalspecimenprocessor.exception.DisscoRepositoryException; import eu.dissco.core.digitalspecimenprocessor.repository.DigitalSpecimenRepository; import eu.dissco.core.digitalspecimenprocessor.repository.ElasticSearchRepository; import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -34,7 +39,8 @@ public class ProcessingService { public List handleMessages(List events) { log.info("Processing {} digital specimen", events.size()); - var processResult = processSpecimens(events); + var uniqueBatch = removeDuplicatesInBatch(events); + var processResult = processSpecimens(uniqueBatch); var results = new ArrayList(); if (!processResult.equalSpecimens().isEmpty()) { updateEqualSpecimen(processResult.equalSpecimens()); @@ -48,35 +54,71 @@ public List handleMessages(List eve return results; } - private ProcessResult processSpecimens(List events) { - var currentSpecimens = getCurrentSpecimen(events); - var equalSpecimens = new ArrayList(); - var changedSpecimens = new ArrayList(); - var newSpecimens = new ArrayList(); - - for (DigitalSpecimenEvent event : events) { - var digitalSpecimen = event.digitalSpecimen(); - log.debug("ds: {}", digitalSpecimen); - if (!currentSpecimens.containsKey(digitalSpecimen.physicalSpecimenId())) { - log.debug("Specimen with id: {} is completely new", digitalSpecimen.physicalSpecimenId()); - newSpecimens.add(event); + private Set removeDuplicatesInBatch(List events) { + var uniqueSet = new HashSet(); + var map = events.stream() + .collect(Collectors.groupingBy(event -> event.digitalSpecimen().physicalSpecimenId())); + for (Entry> entry : map.entrySet()) { + if (entry.getValue().size() > 1) { + log.warn("Found {} duplicates in batch for id {}", entry.getValue().size(), entry.getKey()); + for (int i = 0; i < entry.getValue().size(); i++) { + if (i == 0) { + uniqueSet.add(entry.getValue().get(i)); + } else { + republishEvent(entry.getValue().get(i)); + } + } } else { - var currentDigitalSpecimen = currentSpecimens.get(digitalSpecimen.physicalSpecimenId()); - if (currentDigitalSpecimen.digitalSpecimen().equals(digitalSpecimen)) { - log.debug("Received digital specimen is equal to digital specimen: {}", - currentDigitalSpecimen.id()); - equalSpecimens.add(currentDigitalSpecimen); + uniqueSet.add(entry.getValue().get(0)); + } + } + return uniqueSet; + } + + private ProcessResult processSpecimens(Set events) { + try { + var currentSpecimens = getCurrentSpecimen(events); + var equalSpecimens = new ArrayList(); + var changedSpecimens = new ArrayList(); + var newSpecimens = new ArrayList(); + + for (DigitalSpecimenEvent event : events) { + var digitalSpecimen = event.digitalSpecimen(); + log.debug("ds: {}", digitalSpecimen); + if (!currentSpecimens.containsKey(digitalSpecimen.physicalSpecimenId())) { + log.debug("Specimen with id: {} is completely new", digitalSpecimen.physicalSpecimenId()); + newSpecimens.add(event); } else { - log.debug("Specimen with id: {} has received an update", currentDigitalSpecimen.id()); - changedSpecimens.add( - new UpdatedDigitalSpecimenTuple(currentDigitalSpecimen, digitalSpecimen)); + var currentDigitalSpecimen = currentSpecimens.get(digitalSpecimen.physicalSpecimenId()); + if (currentDigitalSpecimen.digitalSpecimen().equals(digitalSpecimen)) { + log.debug("Received digital specimen is equal to digital specimen: {}", + currentDigitalSpecimen.id()); + equalSpecimens.add(currentDigitalSpecimen); + } else { + log.debug("Specimen with id: {} has received an update", currentDigitalSpecimen.id()); + changedSpecimens.add( + new UpdatedDigitalSpecimenTuple(currentDigitalSpecimen, event)); + } } } + return new ProcessResult(equalSpecimens, changedSpecimens, newSpecimens); + } catch (DisscoRepositoryException ex) { + log.error("Republishing messages, Unable to retrieve current specimen from repository", ex); + events.forEach(this::republishEvent); + return new ProcessResult(List.of(), List.of(), List.of()); } - return new ProcessResult(equalSpecimens, changedSpecimens, newSpecimens); } - private Map getCurrentSpecimen(List events) { + private void republishEvent(DigitalSpecimenEvent event) { + try { + kafkaService.republishEvent(event); + } catch (JsonProcessingException e) { + log.error("Fatal exception, unable to republish message due to invalid json", e); + } + } + + private Map getCurrentSpecimen(Set events) + throws DisscoRepositoryException { return repository.getDigitalSpecimens( events.stream().map(event -> event.digitalSpecimen().physicalSpecimenId()).toList()) .stream().collect( @@ -94,50 +136,134 @@ private void updateEqualSpecimen(List currentDigitalSpeci private Set updateExistingDigitalSpecimen( List updatedDigitalSpecimenTuples) { - var handleUpdates = updatedDigitalSpecimenTuples.stream().filter( - tuple -> handleNeedsUpdate(tuple.currentSpecimen().digitalSpecimen(), - tuple.digitalSpecimen())).toList(); - if (!handleUpdates.isEmpty()){ - handleService.updateHandles(handleUpdates); - } + updateHandles(updatedDigitalSpecimenTuples); - var digitalSpecimenRecords = updatedDigitalSpecimenTuples.stream().collect(Collectors.toMap( - tuple -> new DigitalSpecimenRecord( - tuple.currentSpecimen().id(), - calculateMidsLevel(tuple.digitalSpecimen()), - tuple.currentSpecimen().version() + 1, - Instant.now(), - tuple.digitalSpecimen() - ), UpdatedDigitalSpecimenTuple::currentSpecimen)); + var digitalSpecimenRecords = getSpecimenRecordMap(updatedDigitalSpecimenTuples); log.info("Persisting to db"); - repository.createDigitalSpecimenRecord(digitalSpecimenRecords.keySet()); + repository.createDigitalSpecimenRecord( + digitalSpecimenRecords.stream().map(UpdatedDigitalSpecimenRecord::digitalSpecimenRecord) + .toList()); log.info("Persisting to elastic"); - BulkResponse bulkResponse = null; try { - bulkResponse = elasticRepository.indexDigitalSpecimen(digitalSpecimenRecords.keySet()); + var bulkResponse = elasticRepository.indexDigitalSpecimen( + digitalSpecimenRecords.stream().map(UpdatedDigitalSpecimenRecord::digitalSpecimenRecord) + .toList()); if (!bulkResponse.errors()) { - log.debug("Successfully indexed {} specimens", digitalSpecimenRecords); - digitalSpecimenRecords.forEach(kafkaService::publishUpdateEvent); + handleSuccessfulElasticUpdate(digitalSpecimenRecords); } else { - var digitalSpecimenMap = digitalSpecimenRecords.values().stream() - .collect(Collectors.toMap(DigitalSpecimenRecord::id, Function.identity())); - bulkResponse.items().forEach( - item -> { - var digitalSpecimenRecord = digitalSpecimenMap.get(item.id()); - if (item.error() != null) { - // TODO Rollback database (remove version) and move message to DLQ - digitalSpecimenRecords.remove(digitalSpecimenRecord); - } else { - kafkaService.publishUpdateEvent(digitalSpecimenRecord, - digitalSpecimenRecords.get(digitalSpecimenRecord)); - } - } - ); + handlePartiallyElasticUpdate(digitalSpecimenRecords, bulkResponse); } - log.info("Successfully updated {} digitalSpecimen", updatedDigitalSpecimenTuples.size()); - return digitalSpecimenRecords.keySet(); + var successfullyProcessedRecords = digitalSpecimenRecords.stream() + .map(UpdatedDigitalSpecimenRecord::digitalSpecimenRecord).collect( + Collectors.toSet()); + log.info("Successfully updated {} digitalSpecimen", successfullyProcessedRecords.size()); + return successfullyProcessedRecords; } catch (IOException e) { - throw new RuntimeException(e); + log.error("Rolling back, failed to insert records in elastic", e); + digitalSpecimenRecords.forEach( + updatedDigitalSpecimenRecord -> rollbackUpdatedSpecimen(updatedDigitalSpecimenRecord, + false)); + return Set.of(); + } + } + + private void handleSuccessfulElasticUpdate( + Set digitalSpecimenRecords) { + log.debug("Successfully indexed {} specimens", digitalSpecimenRecords); + var failedRecords = new HashSet(); + for (var digitalSpecimenRecord : digitalSpecimenRecords) { + var successfullyPublished = publishUpdateEvent(digitalSpecimenRecord); + if (!successfullyPublished) { + failedRecords.add(digitalSpecimenRecord); + } + } + digitalSpecimenRecords.removeAll(failedRecords); + } + + private void handlePartiallyElasticUpdate( + Set digitalSpecimenRecords, + BulkResponse bulkResponse) { + var digitalSpecimenMap = digitalSpecimenRecords.stream() + .collect(Collectors.toMap( + updatedDigitalSpecimenRecord -> updatedDigitalSpecimenRecord.digitalSpecimenRecord() + .id(), Function.identity())); + bulkResponse.items().forEach( + item -> { + var digitalSpecimenRecord = digitalSpecimenMap.get(item.id()); + if (item.error() != null) { + log.error("Failed item to insert into elastic search: {} with errors {}", + digitalSpecimenRecord.digitalSpecimenRecord().id(), item.error().reason()); + rollbackUpdatedSpecimen(digitalSpecimenRecord, false); + digitalSpecimenRecords.remove(digitalSpecimenRecord); + } else { + var successfullyPublished = publishUpdateEvent(digitalSpecimenRecord); + if (!successfullyPublished) { + digitalSpecimenRecords.remove(digitalSpecimenRecord); + } + } + } + ); + } + + private Set getSpecimenRecordMap( + List updatedDigitalSpecimenTuples) { + return updatedDigitalSpecimenTuples.stream().map(tuple -> new UpdatedDigitalSpecimenRecord( + new DigitalSpecimenRecord( + tuple.currentSpecimen().id(), + 1, + tuple.currentSpecimen().version() + 1, + Instant.now(), + tuple.digitalSpecimenEvent().digitalSpecimen()), + tuple.digitalSpecimenEvent().enrichmentList(), + tuple.currentSpecimen() + )).collect(Collectors.toSet()); + } + + private void updateHandles(List updatedDigitalSpecimenTuples) { + var handleUpdates = updatedDigitalSpecimenTuples.stream().filter( + tuple -> handleNeedsUpdate(tuple.currentSpecimen().digitalSpecimen(), + tuple.digitalSpecimenEvent().digitalSpecimen())).toList(); + if (!handleUpdates.isEmpty()) { + handleService.updateHandles(handleUpdates); + } + } + + private boolean publishUpdateEvent(UpdatedDigitalSpecimenRecord updatedDigitalSpecimenRecord) { + try { + kafkaService.publishUpdateEvent(updatedDigitalSpecimenRecord.digitalSpecimenRecord(), + updatedDigitalSpecimenRecord.currentDigitalSpecimen()); + return true; + } catch (JsonProcessingException e) { + log.error("Rolling back, failed to publish update event", e); + rollbackUpdatedSpecimen(updatedDigitalSpecimenRecord, true); + return false; + } + } + + private void rollbackUpdatedSpecimen(UpdatedDigitalSpecimenRecord updatedDigitalSpecimenRecord, + boolean elasticRollback) { + if (elasticRollback) { + try { + elasticRepository.rollbackVersion(updatedDigitalSpecimenRecord.currentDigitalSpecimen()); + } catch (IOException e) { + log.error("Fatal exception, unable to roll back update for: " + + updatedDigitalSpecimenRecord.currentDigitalSpecimen(), e); + } + } + repository.deleteVersion(updatedDigitalSpecimenRecord.digitalSpecimenRecord()); + if (handleNeedsUpdate(updatedDigitalSpecimenRecord.currentDigitalSpecimen().digitalSpecimen(), + updatedDigitalSpecimenRecord.digitalSpecimenRecord() + .digitalSpecimen())) { + handleService.deleteVersion(updatedDigitalSpecimenRecord.currentDigitalSpecimen()); + } + try { + kafkaService.deadLetterEvent( + new DigitalSpecimenEvent(updatedDigitalSpecimenRecord.enrichment(), + updatedDigitalSpecimenRecord.digitalSpecimenRecord() + .digitalSpecimen())); + } catch (JsonProcessingException e) { + log.error("Fatal exception, unable to dead letter queue: " + + updatedDigitalSpecimenRecord.digitalSpecimenRecord().id(), e); } } @@ -149,22 +275,7 @@ private boolean handleNeedsUpdate(DigitalSpecimen currentDigitalSpecimen, private Set createNewDigitalSpecimen(List events) { var digitalSpecimenRecords = events.stream().collect(Collectors.toMap( - event -> { - try { - return new DigitalSpecimenRecord( - handleService.createNewHandle(event.digitalSpecimen()), - calculateMidsLevel(event.digitalSpecimen()), - 1, - Instant.now(), - event.digitalSpecimen() - ); - } catch (TransformerException e) { - log.error("Failed to process record with id: {}", - event.digitalSpecimen().physicalSpecimenId(), - e); - return null; - } - }, + this::mapToDigitalSpecimenRecord, DigitalSpecimenEvent::enrichmentList )); digitalSpecimenRecords.remove(null); @@ -176,38 +287,111 @@ private Set createNewDigitalSpecimen(List { - kafkaService.publishCreateEvent(key); - value.forEach(aas -> kafkaService.publishAnnotationRequestEvent(aas, key)); - }); + handleSuccessfulElasticInsert(digitalSpecimenRecords); } else { - var digitalSpecimenMap = digitalSpecimenRecords.keySet().stream() - .collect(Collectors.toMap(DigitalSpecimenRecord::id, Function.identity())); - bulkResponse.items().forEach( - item -> { - var digitalSpecimenRecord = digitalSpecimenMap.get(item.id()); - if (item.error() != null) { - // TODO Rollback database and handle and move message to DLQ - digitalSpecimenRecords.remove(digitalSpecimenRecord); - } else { - kafkaService.publishCreateEvent(digitalSpecimenRecord); - digitalSpecimenRecords.get(digitalSpecimenRecord).forEach( - aas -> kafkaService.publishAnnotationRequestEvent(aas, digitalSpecimenRecord)); - } - } - ); + handlePartiallyFailedElasticInsert(digitalSpecimenRecords, bulkResponse); } log.info("Successfully created {} new digitalSpecimen", digitalSpecimenRecords.size()); return digitalSpecimenRecords.keySet(); } catch (IOException e) { - // TODO rollback all items from database and remove handles - throw new RuntimeException(e); + log.error("Rolling back, failed to insert records in elastic", e); + digitalSpecimenRecords.forEach(this::rollbackNewSpecimen); + return Set.of(); + } + } + + private void handleSuccessfulElasticInsert( + Map> digitalSpecimenRecords) { + log.debug("Successfully indexed {} specimens", digitalSpecimenRecords); + for (var entry : digitalSpecimenRecords.entrySet()) { + var successfullyPublished = publishEvents(entry.getKey(), entry.getValue()); + if (!successfullyPublished) { + digitalSpecimenRecords.remove(entry.getKey()); + } + } + } + + private void handlePartiallyFailedElasticInsert( + Map> digitalSpecimenRecords, + BulkResponse bulkResponse) { + var digitalSpecimenMap = digitalSpecimenRecords.keySet().stream() + .collect(Collectors.toMap(DigitalSpecimenRecord::id, Function.identity())); + bulkResponse.items().forEach( + item -> { + var digitalSpecimenRecord = digitalSpecimenMap.get(item.id()); + if (item.error() != null) { + log.error("Failed item to insert into elastic search: {} with errors {}", + digitalSpecimenRecord.id(), item.error().reason()); + rollbackNewSpecimen(digitalSpecimenRecord, + digitalSpecimenRecords.get(digitalSpecimenRecord)); + digitalSpecimenRecords.remove(digitalSpecimenRecord); + } else { + var successfullyPublished = publishEvents(digitalSpecimenRecord, + digitalSpecimenRecords.get(digitalSpecimenRecord)); + if (!successfullyPublished) { + digitalSpecimenRecords.remove(digitalSpecimenRecord); + } + } + } + ); + } + + private boolean publishEvents(DigitalSpecimenRecord key, List value) { + try { + kafkaService.publishCreateEvent(key); + } catch (JsonProcessingException e) { + log.error("Rolling back, failed to publish Create event", e); + rollbackNewSpecimen(key, value, true); + return false; } + value.forEach(aas -> { + try { + kafkaService.publishAnnotationRequestEvent(aas, key); + } catch (JsonProcessingException e) { + log.error("No action take, failed to publish annotation request event", e); + } + }); + return true; + } + private void rollbackNewSpecimen(DigitalSpecimenRecord digitalSpecimenRecord, + List enrichments) { + rollbackNewSpecimen(digitalSpecimenRecord, enrichments, false); + } + + private void rollbackNewSpecimen(DigitalSpecimenRecord digitalSpecimenRecord, + List enrichments, boolean elasticRollback) { + if (elasticRollback) { + try { + elasticRepository.rollbackSpecimen(digitalSpecimenRecord); + } catch (IOException e) { + log.error("Fatal exception, unable to roll back: " + digitalSpecimenRecord.id(), e); + } + } + repository.rollbackSpecimen(digitalSpecimenRecord.id()); + handleService.rollbackHandleCreation(digitalSpecimenRecord); + try { + kafkaService.deadLetterEvent( + new DigitalSpecimenEvent(enrichments, digitalSpecimenRecord.digitalSpecimen())); + } catch (JsonProcessingException e) { + log.error("Fatal exception, unable to dead letter queue: " + digitalSpecimenRecord.id(), e); + } } - private int calculateMidsLevel(DigitalSpecimen digitalSpecimen) { - return 1; + private DigitalSpecimenRecord mapToDigitalSpecimenRecord(DigitalSpecimenEvent event) { + try { + return new DigitalSpecimenRecord( + handleService.createNewHandle(event.digitalSpecimen()), + 1, + 1, + Instant.now(), + event.digitalSpecimen() + ); + } catch (TransformerException e) { + log.error("Failed to process record with id: {}", + event.digitalSpecimen().physicalSpecimenId(), + e); + return null; + } } } diff --git a/src/main/resources/jooq-configuration.xml b/src/main/resources/jooq-configuration.xml new file mode 100644 index 0000000..d7d5c6e --- /dev/null +++ b/src/main/resources/jooq-configuration.xml @@ -0,0 +1,29 @@ + + + org.postgresql.Driver + + + + + + + true + false + + + org.jooq.meta.postgres.PostgresDatabase + public + new_digital_specimen | handles + + + INSTANT + TIMESTAMPTZ + + + + + eu.dissco.core.digitalspecimenprocessor.database.jooq + src/main/java + + + \ No newline at end of file diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepositoryIT.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepositoryIT.java index 383c280..d0be446 100644 --- a/src/test/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepositoryIT.java +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/repository/DigitalSpecimenRepositoryIT.java @@ -4,11 +4,14 @@ import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.HANDLE; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.MAPPER; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.PHYSICAL_SPECIMEN_ID; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.SECOND_HANDLE; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.THIRD_HANDLE; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenRecord; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mockStatic; +import eu.dissco.core.digitalspecimenprocessor.exception.DisscoRepositoryException; import java.sql.BatchUpdateException; import java.time.Instant; import java.util.List; @@ -38,7 +41,7 @@ void destroy() { } @Test - void testGetDigitalSpecimensEmpty() { + void testGetDigitalSpecimensEmpty() throws DisscoRepositoryException { // Given // When @@ -49,7 +52,7 @@ void testGetDigitalSpecimensEmpty() { } @Test - void testGetDigitalSpecimens() { + void testGetDigitalSpecimens() throws DisscoRepositoryException { // Given repository.createDigitalSpecimenRecord( List.of( @@ -86,8 +89,8 @@ void testUpdateLastChecked() { repository.createDigitalSpecimenRecord( List.of( givenDigitalSpecimenRecord(), - givenDigitalSpecimenRecord("20.5000.1025/XXX-XXX-XXX", "TEST_1"), - givenDigitalSpecimenRecord("20.5000.1025/YYY-YYY-YYY", "TEST_2"))); + givenDigitalSpecimenRecord(SECOND_HANDLE, "TEST_1"), + givenDigitalSpecimenRecord(THIRD_HANDLE, "TEST_2"))); // When try (MockedStatic mockedStatic = mockStatic(Instant.class)) { @@ -107,8 +110,8 @@ void testInsertDuplicateSpecimens() { // Given var records = List.of( givenDigitalSpecimenRecord(), - givenDigitalSpecimenRecord("20.5000.1025/XXX-XXX-XXX", "TEST_1"), - givenDigitalSpecimenRecord("20.5000.1025/XXX-XXX-XXX", "TEST_2")); + givenDigitalSpecimenRecord(SECOND_HANDLE, "TEST_1"), + givenDigitalSpecimenRecord(SECOND_HANDLE, "TEST_2")); // When var exception = assertThrows(DataAccessException.class, () -> { @@ -120,4 +123,39 @@ void testInsertDuplicateSpecimens() { PSQLException.class); } + @Test + void testRollbackSpecimen() throws DisscoRepositoryException { + // Given + repository.createDigitalSpecimenRecord( + List.of( + givenDigitalSpecimenRecord(), + givenDigitalSpecimenRecord(SECOND_HANDLE, "TEST_1"), + givenDigitalSpecimenRecord(THIRD_HANDLE, "TEST_2"))); + + // When + repository.rollbackSpecimen(HANDLE); + + // Then + var result = repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID)); + assertThat(result).isEmpty(); + } + + @Test + void testRollbackVersion() throws DisscoRepositoryException { + // Given + repository.createDigitalSpecimenRecord( + List.of( + givenDigitalSpecimenRecord(), + givenDigitalSpecimenRecord("20.5000.1025/XXX-XXX-XXX", "TEST_1"), + givenDigitalSpecimenRecord("20.5000.1025/YYY-YYY-YYY", "TEST_2"))); + repository.createDigitalSpecimenRecord(List.of(givenDigitalSpecimenRecord(2))); + + // When + repository.deleteVersion(givenDigitalSpecimenRecord(2)); + + // Then + var result = repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID)); + assertThat(result.get(0)).isEqualTo(givenDigitalSpecimenRecord()); + } + } diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/repository/HandleRepositoryIT.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/repository/HandleRepositoryIT.java index 04b97cb..a1e8a00 100644 --- a/src/test/java/eu/dissco/core/digitalspecimenprocessor/repository/HandleRepositoryIT.java +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/repository/HandleRepositoryIT.java @@ -49,7 +49,7 @@ void testUpdateHandleAttributes(){ "anotherLicenseType".getBytes(StandardCharsets.UTF_8)); // When - repository.updateHandleAttributes(HANDLE, CREATED, List.of(updatedHandle)); + repository.updateHandleAttributes(HANDLE, CREATED, List.of(updatedHandle), true); // Then var result = context.select(HANDLES.DATA) @@ -60,6 +60,41 @@ void testUpdateHandleAttributes(){ assertThat(result).isEqualTo("2".getBytes(StandardCharsets.UTF_8)); } + @Test + void testRollbackVersion(){ + // Given + var handleAttributes = givenHandleAttributes(); + repository.createHandle(HANDLE, CREATED, handleAttributes); + var updatedHandle = new HandleAttribute(11, "pidKernelMetadataLicense", + "anotherLicenseType".getBytes(StandardCharsets.UTF_8)); + repository.updateHandleAttributes(HANDLE, CREATED, List.of(updatedHandle), true); + // When + + repository.updateHandleAttributes(HANDLE, CREATED, handleAttributes, false); + + // Then + var result = context.select(HANDLES.DATA) + .from(HANDLES) + .where(HANDLES.HANDLE.eq(HANDLE.getBytes(StandardCharsets.UTF_8))) + .and(HANDLES.TYPE.eq("issueNumber".getBytes(StandardCharsets.UTF_8))) + .fetchOne(Record1::value1); + assertThat(result).isEqualTo("1".getBytes(StandardCharsets.UTF_8)); + } + + @Test + void testRollbackHandle() { + // Given + var handleAttributes = givenHandleAttributes(); + repository.createHandle(HANDLE, CREATED, handleAttributes); + + // When + repository.rollbackHandleCreation(HANDLE); + + // Then + var handles = context.selectFrom(HANDLES).fetch(); + assertThat(handles).isEmpty(); + } + private List givenHandleAttributes() { return List.of( new HandleAttribute(1, "pid", diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/HandleServiceTest.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/HandleServiceTest.java index 2746db5..e91fb27 100644 --- a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/HandleServiceTest.java +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/HandleServiceTest.java @@ -3,25 +3,32 @@ import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.CREATED; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.HANDLE; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.MAPPER; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.PHYSICAL_SPECIMEN_ID; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimen; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenEvent; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenRecord; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenUnequalDigitalSpecimenRecord; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mockStatic; +import eu.dissco.core.digitalspecimenprocessor.domain.HandleAttribute; import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenTuple; import eu.dissco.core.digitalspecimenprocessor.repository.HandleRepository; import java.time.Clock; import java.time.Instant; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.List; import java.util.Random; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,7 +51,9 @@ class HandleServiceTest { @BeforeEach void setup() throws ParserConfigurationException { var docFactory = DocumentBuilderFactory.newInstance(); - service = new HandleService(random, MAPPER, docFactory.newDocumentBuilder(), repository); + var transfactory = TransformerFactory.newInstance(); + service = new HandleService(random, MAPPER, docFactory.newDocumentBuilder(), repository, + transfactory); Clock clock = Clock.fixed(CREATED, ZoneOffset.UTC); Instant instant = Instant.now(clock); mockedStatic = mockStatic(Instant.class); @@ -77,10 +86,33 @@ void testUpdateHandle() { // When service.updateHandles(List.of( new UpdatedDigitalSpecimenTuple(givenUnequalDigitalSpecimenRecord(), - givenDigitalSpecimen()))); + givenDigitalSpecimenEvent()))); // Then - then(repository).should().updateHandleAttributes(eq(HANDLE), eq(CREATED), anyList()); + then(repository).should().updateHandleAttributes(eq(HANDLE), eq(CREATED), anyList(), eq(true)); } + @Test + void testRollbackHandleCreation() { + // Given + + // When + service.rollbackHandleCreation(givenDigitalSpecimenRecord()); + + // Then + then(repository).should().rollbackHandleCreation(HANDLE); + } + + @Test + void testDeleteVersion() { + // Given + + // When + service.deleteVersion(givenDigitalSpecimenRecord()); + + // Then + then(repository).should().updateHandleAttributes(eq(HANDLE), eq(CREATED), anyList(), eq(false)); + } + + } diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerServiceTest.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerServiceTest.java index f1ed753..546e47e 100644 --- a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerServiceTest.java +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaConsumerServiceTest.java @@ -16,12 +16,14 @@ class KafkaConsumerServiceTest { @Mock private ProcessingService processingService; + @Mock + private KafkaPublisherService publisherService; private KafkaConsumerService service; @BeforeEach void setup() { - service = new KafkaConsumerService(MAPPER, processingService); + service = new KafkaConsumerService(MAPPER, processingService, publisherService); } @Test diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherServiceTest.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherServiceTest.java index e2763f4..a571280 100644 --- a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherServiceTest.java +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/KafkaPublisherServiceTest.java @@ -2,12 +2,14 @@ import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.AAS; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.MAPPER; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenEvent; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenRecord; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenUnequalDigitalSpecimenRecord; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.then; +import com.fasterxml.jackson.core.JsonProcessingException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -30,7 +32,7 @@ void setup() { } @Test - void testPublishCreateEvent() { + void testPublishCreateEvent() throws JsonProcessingException { // Given // When @@ -41,7 +43,7 @@ void testPublishCreateEvent() { } @Test - void testPublishAnnotationRequestEvent() { + void testPublishAnnotationRequestEvent() throws JsonProcessingException { // Given // When @@ -52,7 +54,7 @@ void testPublishAnnotationRequestEvent() { } @Test - void testPublishUpdateEvent() { + void testPublishUpdateEvent() throws JsonProcessingException { // Given // When @@ -62,4 +64,39 @@ void testPublishUpdateEvent() { then(kafkaTemplate).should().send(eq("createUpdateDeleteTopic"), anyString()); } + @Test + void testRepublishEvent() throws JsonProcessingException { + // Given + + // When + service.republishEvent(givenDigitalSpecimenEvent()); + + // Then + then(kafkaTemplate).should() + .send("digital-specimen", MAPPER.writeValueAsString(givenDigitalSpecimenEvent())); + } + + @Test + void testDeadLetterEvent() throws JsonProcessingException { + // Given + + // When + service.deadLetterEvent(givenDigitalSpecimenEvent()); + + // Then + then(kafkaTemplate).should() + .send("digital-specimen-dlq", MAPPER.writeValueAsString(givenDigitalSpecimenEvent())); + } + + @Test + void testDeadLetterRaw() throws JsonProcessingException { + // Given + var rawEvent = MAPPER.writeValueAsString(givenDigitalSpecimenEvent()); + + // When + service.deadLetterRaw(rawEvent); + + // Then + then(kafkaTemplate).should().send("digital-specimen-dlq", rawEvent); + } } diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingServiceTest.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingServiceTest.java index f48ddc9..2b3fad1 100644 --- a/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingServiceTest.java +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/service/ProcessingServiceTest.java @@ -1,20 +1,38 @@ package eu.dissco.core.digitalspecimenprocessor.service; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.AAS; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.ANOTHER_ORGANISATION; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.ANOTHER_SPECIMEN_NAME; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.CREATED; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.HANDLE; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.PHYSICAL_SPECIMEN_ID; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.SECOND_HANDLE; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.THIRD_HANDLE; +import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDifferentUnequalSpecimen; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimen; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenEvent; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenDigitalSpecimenRecord; import static eu.dissco.core.digitalspecimenprocessor.utils.TestUtils.givenUnequalDigitalSpecimenRecord; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import co.elastic.clients.elasticsearch._types.ErrorCause; import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import com.fasterxml.jackson.core.JsonProcessingException; +import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen; +import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenEvent; import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord; +import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenTuple; +import eu.dissco.core.digitalspecimenprocessor.exception.DisscoRepositoryException; import eu.dissco.core.digitalspecimenprocessor.repository.DigitalSpecimenRepository; import eu.dissco.core.digitalspecimenprocessor.repository.ElasticSearchRepository; import java.io.IOException; @@ -64,13 +82,14 @@ void destroy() { } @Test - void testEqualSpecimen() { + void testEqualSpecimen() throws DisscoRepositoryException { // Given given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn( List.of(givenDigitalSpecimenRecord())); // When - List result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + List result = service.handleMessages( + List.of(givenDigitalSpecimenEvent())); // Then then(repository).should().updateLastChecked(List.of(HANDLE)); @@ -78,9 +97,9 @@ void testEqualSpecimen() { } @Test - void testUnequalSpecimen() throws IOException { + void testUnequalSpecimen() throws IOException, DisscoRepositoryException { // Given - var expected = Set.of(givenDigitalSpecimenRecord(2)); + var expected = List.of(givenDigitalSpecimenRecord(2)); given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn( List.of(givenUnequalDigitalSpecimenRecord())); given(bulkResponse.errors()).willReturn(false); @@ -100,7 +119,7 @@ void testUnequalSpecimen() throws IOException { } @Test - void testNewSpecimen() throws TransformerException, IOException { + void testNewSpecimen() throws TransformerException, IOException, DisscoRepositoryException { // Given given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn(List.of()); given(handleService.createNewHandle(givenDigitalSpecimen())).willReturn(HANDLE); @@ -120,7 +139,192 @@ void testNewSpecimen() throws TransformerException, IOException { } @Test - void testNewSpecimenError() throws TransformerException { + void testDuplicateNewSpecimen() + throws TransformerException, IOException, DisscoRepositoryException { + // Given + var duplicateSpecimen = new DigitalSpecimenEvent(List.of(AAS), + givenDigitalSpecimen(PHYSICAL_SPECIMEN_ID, ANOTHER_SPECIMEN_NAME, ANOTHER_ORGANISATION)); + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn(List.of()); + given(handleService.createNewHandle(givenDigitalSpecimen())).willReturn(HANDLE); + given(bulkResponse.errors()).willReturn(false); + given( + elasticRepository.indexDigitalSpecimen(Set.of(givenDigitalSpecimenRecord()))).willReturn( + bulkResponse); + + // When + var result = service.handleMessages( + List.of(givenDigitalSpecimenEvent(), duplicateSpecimen)); + + // Then + then(repository).should().createDigitalSpecimenRecord(Set.of(givenDigitalSpecimenRecord())); + then(kafkaService).should().publishCreateEvent(givenDigitalSpecimenRecord()); + then(kafkaService).should().publishAnnotationRequestEvent(AAS, givenDigitalSpecimenRecord()); + then(kafkaService).should().republishEvent(duplicateSpecimen); + assertThat(result).isEqualTo(List.of(givenDigitalSpecimenRecord())); + } + + @Test + void testNewSpecimenIOException() + throws TransformerException, IOException, DisscoRepositoryException { + // Given + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn(List.of()); + given(handleService.createNewHandle(givenDigitalSpecimen())).willReturn(HANDLE); + given( + elasticRepository.indexDigitalSpecimen(Set.of(givenDigitalSpecimenRecord()))).willThrow( + IOException.class); + + // When + var result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + then(repository).should().createDigitalSpecimenRecord(Set.of(givenDigitalSpecimenRecord())); + then(repository).should().rollbackSpecimen(givenDigitalSpecimenRecord().id()); + then(handleService).should().rollbackHandleCreation(givenDigitalSpecimenRecord()); + then(kafkaService).should().deadLetterEvent(givenDigitalSpecimenEvent()); + assertThat(result).isEmpty(); + } + + @Test + void testNewSpecimenPartialElasticFailed() + throws TransformerException, IOException, DisscoRepositoryException { + // Given + var secondEvent = givenDigitalSpecimenEvent("Another Specimen"); + var secondSpecimen = givenDigitalSpecimenRecord(SECOND_HANDLE, "Another Specimen"); + var thirdEvent = givenDigitalSpecimenEvent("A third Specimen"); + var thirdSpecimen = givenDigitalSpecimenRecord(THIRD_HANDLE, "A third Specimen"); + given(repository.getDigitalSpecimens(anyList())).willReturn(List.of()); + given(handleService.createNewHandle(any(DigitalSpecimen.class))).willReturn(THIRD_HANDLE) + .willReturn(SECOND_HANDLE).willReturn(HANDLE); + + givenBulkResponse(); + given(elasticRepository.indexDigitalSpecimen(anySet())).willReturn(bulkResponse); + + // When + var result = service.handleMessages( + List.of(givenDigitalSpecimenEvent(), secondEvent, thirdEvent)); + + // Then + then(repository).should().createDigitalSpecimenRecord(anySet()); + then(handleService).should(times(3)).createNewHandle(any(DigitalSpecimen.class)); + then(repository).should().rollbackSpecimen(secondSpecimen.id()); + then(handleService).should().rollbackHandleCreation(secondSpecimen); + then(kafkaService).should().deadLetterEvent(secondEvent); + assertThat(result).isEqualTo(List.of(thirdSpecimen, givenDigitalSpecimenRecord())); + } + + @Test + void testNewSpecimenKafkaFailed() + throws DisscoRepositoryException, TransformerException, IOException { + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn(List.of()); + given(handleService.createNewHandle(givenDigitalSpecimen())).willReturn(HANDLE); + + given(bulkResponse.errors()).willReturn(false); + given( + elasticRepository.indexDigitalSpecimen(Set.of(givenDigitalSpecimenRecord()))).willReturn( + bulkResponse); + doThrow(JsonProcessingException.class).when(kafkaService) + .publishCreateEvent(any(DigitalSpecimenRecord.class)); + + // When + var result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + then(repository).should().createDigitalSpecimenRecord(anySet()); + then(elasticRepository).should().rollbackSpecimen(givenDigitalSpecimenRecord()); + then(repository).should().rollbackSpecimen(givenDigitalSpecimenRecord().id()); + then(handleService).should().rollbackHandleCreation(givenDigitalSpecimenRecord()); + then(kafkaService).should().deadLetterEvent(givenDigitalSpecimenEvent()); + assertThat(result).isEmpty(); + } + + private void givenBulkResponse() { + var positiveResponse = mock(BulkResponseItem.class); + given(positiveResponse.error()).willReturn(null); + given(positiveResponse.id()).willReturn(HANDLE).willReturn(THIRD_HANDLE); + var negativeResponse = mock(BulkResponseItem.class); + given(negativeResponse.error()).willReturn(new ErrorCause.Builder().reason("Crashed").build()); + given(negativeResponse.id()).willReturn(SECOND_HANDLE); + given(bulkResponse.errors()).willReturn(true); + given(bulkResponse.items()).willReturn( + List.of(positiveResponse, negativeResponse, positiveResponse)); + } + + @Test + void testUpdateSpecimenPartialElasticFailed() throws IOException, DisscoRepositoryException { + // Given + var secondEvent = givenDigitalSpecimenEvent("Another Specimen"); + var thirdEvent = givenDigitalSpecimenEvent("A third Specimen"); + given(repository.getDigitalSpecimens( + List.of("A third Specimen", "Another Specimen", PHYSICAL_SPECIMEN_ID))) + .willReturn(List.of(givenUnequalDigitalSpecimenRecord(), + givenDifferentUnequalSpecimen(SECOND_HANDLE, "Another Specimen"), + givenDifferentUnequalSpecimen(THIRD_HANDLE, "A third Specimen"))); + + givenBulkResponse(); + given(elasticRepository.indexDigitalSpecimen(anyList())).willReturn(bulkResponse); + + // When + var result = service.handleMessages( + List.of(givenDigitalSpecimenEvent(), secondEvent, thirdEvent)); + + // Then + then(handleService).should().updateHandles(anyList()); + then(repository).should().createDigitalSpecimenRecord(anyList()); + then(handleService).should() + .deleteVersion(givenDifferentUnequalSpecimen(SECOND_HANDLE, "Another Specimen")); + then(kafkaService).should().deadLetterEvent(secondEvent); + assertThat(result).hasSize(2); + } + + @Test + void testUpdateSpecimenKafkaFailed() throws DisscoRepositoryException, IOException { + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn( + List.of(givenUnequalDigitalSpecimenRecord())); + + given(bulkResponse.errors()).willReturn(false); + given( + elasticRepository.indexDigitalSpecimen(List.of(givenDigitalSpecimenRecord(2)))).willReturn( + bulkResponse); + doThrow(JsonProcessingException.class).when(kafkaService) + .publishUpdateEvent(givenDigitalSpecimenRecord(2), givenUnequalDigitalSpecimenRecord()); + + // When + var result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + then(repository).should().createDigitalSpecimenRecord(anyList()); + then(elasticRepository).should().rollbackVersion(givenUnequalDigitalSpecimenRecord()); + then(repository).should().deleteVersion(givenDigitalSpecimenRecord(2)); + then(kafkaService).should().deadLetterEvent(givenDigitalSpecimenEvent()); + assertThat(result).isEmpty(); + } + + @Test + void testUpdateSpecimenIOException() throws IOException, DisscoRepositoryException { + // Given + var unequalCurrentDigitalSpecimen = givenUnequalDigitalSpecimenRecord(ANOTHER_ORGANISATION); + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn( + List.of(unequalCurrentDigitalSpecimen)); + given( + elasticRepository.indexDigitalSpecimen(List.of(givenDigitalSpecimenRecord(2)))).willThrow( + IOException.class); + + // When + var result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + then(handleService).should().updateHandles(List.of( + new UpdatedDigitalSpecimenTuple(unequalCurrentDigitalSpecimen, + givenDigitalSpecimenEvent()))); + then(repository).should().createDigitalSpecimenRecord(List.of(givenDigitalSpecimenRecord(2))); + then(repository).should().deleteVersion(givenDigitalSpecimenRecord(2)); + then(handleService).should().deleteVersion(unequalCurrentDigitalSpecimen); + then(kafkaService).should().deadLetterEvent(givenDigitalSpecimenEvent()); + assertThat(result).isEmpty(); + } + + @Test + void testNewSpecimenError() throws TransformerException, DisscoRepositoryException { // Given given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willReturn(List.of()); given(handleService.createNewHandle(givenDigitalSpecimen())).willThrow( @@ -135,4 +339,21 @@ void testNewSpecimenError() throws TransformerException { assertThat(result).isEmpty(); } + @Test + void testFailedToRetrieveCurrentSpecimen() + throws DisscoRepositoryException, JsonProcessingException { + // Given + given(repository.getDigitalSpecimens(List.of(PHYSICAL_SPECIMEN_ID))).willThrow( + DisscoRepositoryException.class); + + // When + var result = service.handleMessages(List.of(givenDigitalSpecimenEvent())); + + // Then + assertThat(result).isEmpty(); + then(kafkaService).should().republishEvent(givenDigitalSpecimenEvent()); + then(kafkaService).shouldHaveNoMoreInteractions(); + then(handleService).shouldHaveNoInteractions(); + } + } diff --git a/src/test/java/eu/dissco/core/digitalspecimenprocessor/utils/TestUtils.java b/src/test/java/eu/dissco/core/digitalspecimenprocessor/utils/TestUtils.java index f905116..e889200 100644 --- a/src/test/java/eu/dissco/core/digitalspecimenprocessor/utils/TestUtils.java +++ b/src/test/java/eu/dissco/core/digitalspecimenprocessor/utils/TestUtils.java @@ -13,6 +13,8 @@ public class TestUtils { public static ObjectMapper MAPPER = new ObjectMapper().findAndRegisterModules(); public static String HANDLE = "20.5000.1025/V1Z-176-LL4"; + public static String SECOND_HANDLE = "20.5000.1025/XXX-XXX-XXX"; + public static String THIRD_HANDLE = "20.5000.1025/YYY-YYY-YYY"; public static int MIDS_LEVEL = 1; public static int VERSION = 1; public static Instant CREATED = Instant.parse("2022-11-01T09:59:24.00Z"); @@ -21,7 +23,9 @@ public class TestUtils { public static String PHYSICAL_SPECIMEN_ID = "https://geocollections.info/specimen/23602"; public static String PHYSICAL_SPECIMEN_TYPE = "cetaf"; public static String SPECIMEN_NAME = "Biota"; + public static String ANOTHER_SPECIMEN_NAME = "Another SpecimenName"; public static String ORGANIZATION_ID = "https://ror.org/0443cwa12"; + public static String ANOTHER_ORGANISATION = "Another organisation"; public static String DATASET_ID = null; public static String PHYSICAL_SPECIMEN_COLLECTION = null; public static String SOURCE_SYSTEM_ID = "20.5000.1025/MN0-5XP-FFD"; @@ -121,25 +125,53 @@ public static DigitalSpecimenRecord givenDigitalSpecimenRecord() { } public static DigitalSpecimenRecord givenUnequalDigitalSpecimenRecord() { + return givenUnequalDigitalSpecimenRecord(HANDLE, ANOTHER_SPECIMEN_NAME, ORGANIZATION_ID); + } + + public static DigitalSpecimenRecord givenUnequalDigitalSpecimenRecord(String organisation) { + return givenUnequalDigitalSpecimenRecord(HANDLE, ANOTHER_SPECIMEN_NAME, organisation); + } + + public static DigitalSpecimenRecord givenUnequalDigitalSpecimenRecord(String handle, + String specimenName, String organisation) { return new DigitalSpecimenRecord( - HANDLE, + handle, MIDS_LEVEL, VERSION, CREATED, - givenDigitalSpecimen(PHYSICAL_SPECIMEN_ID, "Another SpecimenName") + givenDigitalSpecimen(PHYSICAL_SPECIMEN_ID, specimenName, organisation) ); } - public static DigitalSpecimenRecord givenDigitalSpecimenRecord(String handle, String physicalSpecimenId) { + public static DigitalSpecimenRecord givenDifferentUnequalSpecimen(String handle, + String physicalIdentifier) { return new DigitalSpecimenRecord( handle, MIDS_LEVEL, VERSION, CREATED, - givenDigitalSpecimen(physicalSpecimenId, SPECIMEN_NAME) + givenDigitalSpecimen(physicalIdentifier, ANOTHER_SPECIMEN_NAME, ANOTHER_ORGANISATION)); + } + + public static DigitalSpecimenRecord givenDigitalSpecimenRecord(String handle, + String physicalSpecimenId) { + return new DigitalSpecimenRecord( + handle, + MIDS_LEVEL, + VERSION, + CREATED, + givenDigitalSpecimen(physicalSpecimenId, SPECIMEN_NAME, ORGANIZATION_ID) ); } + public static DigitalSpecimenEvent givenDigitalSpecimenEvent(String physicalSpecimenId) { + return new DigitalSpecimenEvent( + List.of(AAS), + givenDigitalSpecimen(physicalSpecimenId, SPECIMEN_NAME, ORGANIZATION_ID) + ); + } + + public static DigitalSpecimenEvent givenDigitalSpecimenEvent() { return new DigitalSpecimenEvent( List.of(AAS), @@ -148,16 +180,17 @@ public static DigitalSpecimenEvent givenDigitalSpecimenEvent() { } public static DigitalSpecimen givenDigitalSpecimen() { - return givenDigitalSpecimen(PHYSICAL_SPECIMEN_ID, SPECIMEN_NAME); + return givenDigitalSpecimen(PHYSICAL_SPECIMEN_ID, SPECIMEN_NAME, ORGANIZATION_ID); } - public static DigitalSpecimen givenDigitalSpecimen(String physicalSpecimenId, String specimenName) { + public static DigitalSpecimen givenDigitalSpecimen(String physicalSpecimenId, String specimenName, + String organisation) { return new DigitalSpecimen( TYPE, physicalSpecimenId, PHYSICAL_SPECIMEN_TYPE, specimenName, - ORGANIZATION_ID, + organisation, DATASET_ID, PHYSICAL_SPECIMEN_COLLECTION, SOURCE_SYSTEM_ID,