Skip to content

Commit

Permalink
Merge pull request #5 from DiSSCo/feature/add-batching
Browse files Browse the repository at this point in the history
Add batching to processing service
  • Loading branch information
samleeflang authored Nov 1, 2022
2 parents 2187517 + 4dd53d0 commit 6f3cedd
Show file tree
Hide file tree
Showing 18 changed files with 867 additions and 96 deletions.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,16 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.8.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<defaultGoal>clean validate</defaultGoal>
<defaultGoal>clean package</defaultGoal>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public ConsumerFactory<String, String> consumerFactory() {
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getBatchSize());
return new DefaultKafkaConsumerFactory<>(props);
}

Expand All @@ -45,6 +46,7 @@ public ConsumerFactory<String, String> consumerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord;
import eu.dissco.core.digitalspecimenprocessor.exception.NoChangesFoundException;
import eu.dissco.core.digitalspecimenprocessor.service.ProcessingService;
import java.util.List;
import javax.xml.transform.TransformerException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -31,10 +32,13 @@ public class DigitalSpecimenController {
@PreAuthorize("isAuthenticated()")
@PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<DigitalSpecimenRecord> upsertDigitalSpecimen(@RequestBody
DigitalSpecimenEvent event) throws TransformerException, NoChangesFoundException {
DigitalSpecimenEvent event) throws NoChangesFoundException {
log.info("Received digitalSpecimen upsert: {}", event);
var result = processingService.handleMessages(event);
return ResponseEntity.status(HttpStatus.CREATED).body(result);
var result = processingService.handleMessages(List.of(event));
if (result.isEmpty()){
throw new NoChangesFoundException("No changes found for specimen");
}
return ResponseEntity.status(HttpStatus.CREATED).body(result.get(0));
}

@ExceptionHandler(NoChangesFoundException.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package eu.dissco.core.digitalspecimenprocessor.domain;

import java.util.List;

public record DigitalSpecimenRecordEvent(
List<String> enrichmentList,
DigitalSpecimenRecord digitalSpecimenRecord) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package eu.dissco.core.digitalspecimenprocessor.domain;

import java.util.List;

public record ProcessResult(
List<DigitalSpecimenRecord> equalSpecimens,
List<UpdatedDigitalSpecimenTuple> changedSpecimens,
List<DigitalSpecimenEvent> newSpecimens) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package eu.dissco.core.digitalspecimenprocessor.domain;

public record UpdatedDigitalSpecimenTuple(DigitalSpecimenRecord currentSpecimen,
DigitalSpecimen digitalSpecimen) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import eu.dissco.core.digitalspecimenprocessor.Profiles;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.Positive;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Profile;
Expand All @@ -23,4 +24,7 @@ public class KafkaConsumerProperties {
@NotBlank
private String topic;

@Positive
private int batchSize = 5000;

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen;
import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.Query;
import org.jooq.Record;
import org.springframework.stereotype.Repository;

Expand Down Expand Up @@ -55,7 +58,12 @@ private DigitalSpecimenRecord mapDigitalSpecimen(Record dbRecord) {
digitalSpecimen);
}

public int createDigitalSpecimenRecord(DigitalSpecimenRecord digitalSpecimenRecord) {
public int[] createDigitalSpecimenRecord(Collection<DigitalSpecimenRecord> digitalSpecimenRecords) {
var queries = digitalSpecimenRecords.stream().map(this::specimenToQuery).toList();
return context.batch(queries).execute();
}

private Query specimenToQuery(DigitalSpecimenRecord digitalSpecimenRecord) {
return context.insertInto(NEW_DIGITAL_SPECIMEN)
.set(NEW_DIGITAL_SPECIMEN.ID, digitalSpecimenRecord.id())
.set(NEW_DIGITAL_SPECIMEN.TYPE, digitalSpecimenRecord.digitalSpecimen().type())
Expand All @@ -79,13 +87,22 @@ public int createDigitalSpecimenRecord(DigitalSpecimenRecord digitalSpecimenReco
JSONB.valueOf(digitalSpecimenRecord.digitalSpecimen().data().toString()))
.set(NEW_DIGITAL_SPECIMEN.ORIGINAL_DATA,
JSONB.valueOf(digitalSpecimenRecord.digitalSpecimen().originalData().toString()))
.set(NEW_DIGITAL_SPECIMEN.DWCA_ID, digitalSpecimenRecord.digitalSpecimen().dwcaId())
.execute();
.set(NEW_DIGITAL_SPECIMEN.DWCA_ID, digitalSpecimenRecord.digitalSpecimen().dwcaId());
}

public int updateLastChecked(DigitalSpecimenRecord currentDigitalSpecimen) {
public int updateLastChecked(List<String> currentDigitalSpecimen) {
return context.update(NEW_DIGITAL_SPECIMEN)
.set(NEW_DIGITAL_SPECIMEN.LAST_CHECKED, Instant.now())
.where(NEW_DIGITAL_SPECIMEN.ID.eq(currentDigitalSpecimen.id())).execute();
.where(NEW_DIGITAL_SPECIMEN.ID.in(currentDigitalSpecimen))
.execute();
}

public List<DigitalSpecimenRecord> getDigitalSpecimens(List<String> specimenList) {
return context.select(NEW_DIGITAL_SPECIMEN.asterisk())
.distinctOn(NEW_DIGITAL_SPECIMEN.ID)
.from(NEW_DIGITAL_SPECIMEN)
.where(NEW_DIGITAL_SPECIMEN.PHYSICAL_SPECIMEN_ID.in(specimenList))
.orderBy(NEW_DIGITAL_SPECIMEN.ID, NEW_DIGITAL_SPECIMEN.VERSION.desc())
.fetch(this::mapDigitalSpecimen);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package eu.dissco.core.digitalspecimenprocessor.repository;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenRecord;
import eu.dissco.core.digitalspecimenprocessor.property.ElasticSearchProperties;
import java.io.IOException;
import java.util.Collection;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;

Expand All @@ -15,11 +17,17 @@ public class ElasticSearchRepository {
private final ElasticsearchClient client;
private final ElasticSearchProperties properties;

public IndexResponse indexDigitalSpecimen(DigitalSpecimenRecord digitalSpecimen) {
try {
return client.index(idx -> idx.index(properties.getIndexName()).id(digitalSpecimen.id()).document(digitalSpecimen));
} catch (IOException e) {
throw new RuntimeException(e);
public BulkResponse indexDigitalSpecimen(Collection<DigitalSpecimenRecord> digitalSpecimens)
throws IOException {
var bulkRequest = new BulkRequest.Builder();
for (var digitalSpecimen : digitalSpecimens) {
bulkRequest.operations(op ->
op.index(idx -> idx
.index(properties.getIndexName())
.id(digitalSpecimen.id())
.document(digitalSpecimen))
);
}
return client.bulk(bulkRequest.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimen;
import eu.dissco.core.digitalspecimenprocessor.domain.HandleAttribute;
import eu.dissco.core.digitalspecimenprocessor.domain.UpdatedDigitalSpecimenTuple;
import eu.dissco.core.digitalspecimenprocessor.repository.HandleRepository;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -30,7 +31,7 @@ public class HandleService {

private static final String PREFIX = "20.5000.1025/";
private final Random random;
private final char[] symbols = "ABCDEFGHJKLMNPQRSTUVWXYZ1234567890".toCharArray();
private final char[] symbols = "ABCDEFGHJKLMNPQRSTVWXYZ1234567890".toCharArray();
private final char[] buffer = new char[11];
private final ObjectMapper mapper;
private final DocumentBuilder documentBuilder;
Expand Down Expand Up @@ -144,7 +145,7 @@ private int toDigit(char hexChar) {
return digit;
}

public void updateHandle(String id, DigitalSpecimen digitalSpecimen) {
private void updateHandle(String id, DigitalSpecimen digitalSpecimen) {
var handleAttributes = updatedHandles(digitalSpecimen);
var recordTimestamp = Instant.now();
repository.updateHandleAttributes(id, recordTimestamp, handleAttributes);
Expand All @@ -158,4 +159,10 @@ private List<HandleAttribute> updatedHandles(DigitalSpecimen digitalSpecimen) {
digitalSpecimen.organizationId(), "ROR", "Needs to be fixed!")));
return handleAttributes;
}

public void updateHandles(List<UpdatedDigitalSpecimenTuple> handleUpdates) {
for (var handleUpdate : handleUpdates) {
updateHandle(handleUpdate.currentSpecimen().id(), handleUpdate.digitalSpecimen());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dissco.core.digitalspecimenprocessor.Profiles;
import eu.dissco.core.digitalspecimenprocessor.domain.DigitalSpecimenEvent;
import eu.dissco.core.digitalspecimenprocessor.exception.NoChangesFoundException;
import javax.xml.transform.TransformerException;
import java.util.List;
import java.util.Objects;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
Expand All @@ -23,14 +23,17 @@ public class KafkaConsumerService {
private final ProcessingService processingService;

@KafkaListener(topics = "${kafka.consumer.topic}")
public void getMessages(@Payload String message)
throws JsonProcessingException, TransformerException {
var event = mapper.readValue(message, DigitalSpecimenEvent.class);
try {
processingService.handleMessages(event);
} catch (NoChangesFoundException e) {
log.info(e.getMessage());
}
public void getMessages(@Payload List<String> messages) {
var events = messages.stream().map(message -> {
try {
return mapper.readValue(message, DigitalSpecimenEvent.class);
} catch (JsonProcessingException e) {
log.error("Failed to process message", e);
// TODO move message to DLQ
return null;
}
}).filter(Objects::nonNull).toList();
processingService.handleMessages(events);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void publishAnnotationRequestEvent(String enrichment,
}
}

public void publishUpdateEvent(DigitalSpecimenRecord currentDigitalSpecimen,
DigitalSpecimenRecord digitalSpecimenRecord) {
public void publishUpdateEvent(DigitalSpecimenRecord digitalSpecimenRecord,
DigitalSpecimenRecord currentDigitalSpecimen) {
var jsonPatch = createJsonPatch(currentDigitalSpecimen, digitalSpecimenRecord);
var event = new CreateUpdateDeleteEvent(UUID.randomUUID(), "update", "processing-service",
digitalSpecimenRecord.id(), Instant.now(), jsonPatch,
Expand Down
Loading

0 comments on commit 6f3cedd

Please sign in to comment.