Skip to content

Commit

Permalink
Merge pull request #55 from DiSSCo/feature/topic-configurable
Browse files Browse the repository at this point in the history
Make the queue configurable for kafka
  • Loading branch information
samleeflang authored Feb 22, 2024
2 parents 1f9b261 + 50f76e7 commit f7f1c52
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 20 deletions.
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<version>3.2.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>eu.dissco.core</groupId>
Expand All @@ -21,9 +21,8 @@
<jaxb2-maven-plugin.version>3.1.0</jaxb2-maven-plugin.version>
<jakarta.activation-api.version>2.1.2</jakarta.activation-api.version>
<jakarta.xml.bind-api.version>4.0.1</jakarta.xml.bind-api.version>
<logback.version>1.4.12</logback.version>
<mockito-inline.version>5.2.0</mockito-inline.version>
<testcontainers.version>1.19.0</testcontainers.version>
<testcontainers.version>1.19.5</testcontainers.version>
<sonar.organization>dissco</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<sonar.coverage.jacoco.xmlReportPaths>../app-it/target/site/jacoco-aggregate/jacoco.xml
Expand Down Expand Up @@ -88,7 +87,7 @@
<artifactId>dwca-io</artifactId>
<version>${dwca-io.version}</version>
</dependency>
<!-- Needed to overwrite gbif dep with security issue in this lib -->
<!-- Needed to overwrite gbif dep with security issue in this lib -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private void processUnit(DataSet dataset, Unit unit)
attributes.getOdsNormalisedPhysicalSpecimenId(), unit,
attributes.getDwcInstitutionId());
log.debug("Result digital Specimen: {}", digitalSpecimen);
kafkaService.sendMessage("digital-specimen",
kafkaService.sendMessage(
new DigitalSpecimenEvent(
enrichmentServices(false),
digitalSpecimen,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private void processDigitalSpecimen(Collection<ObjectNode> fullRecords,
log.debug("Digital Specimen: {}", digitalObjects);
var translatorEvent = new DigitalSpecimenEvent(enrichmentServices(false),
digitalObjects.getLeft(), digitalObjects.getRight());
kafkaService.sendMessage("digital-specimen", translatorEvent);
kafkaService.sendMessage(translatorEvent);
} catch (DiSSCoDataException e) {
log.error("Encountered data issue with record: {}", fullRecord, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dissco.core.translator.domain.DigitalSpecimenEvent;
import eu.dissco.core.translator.properties.KafkaProperties;
import java.util.concurrent.CompletableFuture;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -16,10 +17,11 @@
public class KafkaService {

private final KafkaTemplate<String, String> kafkaTemplate;
private final KafkaProperties properties;
private final ObjectMapper mapper;

public void sendMessage(String topic, DigitalSpecimenEvent event) throws JsonProcessingException {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,
public void sendMessage(DigitalSpecimenEvent event) throws JsonProcessingException {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(properties.getTopic(),
mapper.writeValueAsString(event));
future.whenComplete((result, ex) -> {
if (ex != null) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/resources/json-schema/identifications.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@
"Roptrocerus typographi (Györfi, 1952)"
]
},
"ods:scientificNameHtmlLabel": {
"type": "string",
"description": "A Hyper Text Markup Language (HTML) representation of the scientific name. Includes correct formatting of the name.",
"examples": [
"<i>Absidia ginsan</i> Komin. et al., 1952"
]
},
"dwc:scientificNameAuthorship": {
"type": "string",
"description": "https://rs.tdwg.org/dwc/terms/scientificNameAuthorship",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void testRetrieveData206() throws Exception {

// Then
then(webClient).should(times(2)).get();
then(kafkaService).should(times(99)).sendMessage(eq("digital-specimen"), any(
then(kafkaService).should(times(99)).sendMessage(any(
DigitalSpecimenEvent.class));
}

Expand All @@ -123,7 +123,7 @@ void testRetrieveDataWithMedia206() throws Exception {

// Then
then(webClient).should(times(1)).get();
then(kafkaService).should(times(100)).sendMessage(eq("digital-specimen"), any(
then(kafkaService).should(times(100)).sendMessage( any(
DigitalSpecimenEvent.class));
}

Expand All @@ -147,7 +147,7 @@ void testRetrieveDataInvalidMedia() throws Exception {
// Then
var captor = ArgumentCaptor.forClass(DigitalSpecimenEvent.class);
then(webClient).should(times(1)).get();
then(kafkaService).should(times(1)).sendMessage(eq("digital-specimen"), captor.capture());
then(kafkaService).should(times(1)).sendMessage(captor.capture());
assertThat(captor.getValue().digitalMediaObjectEvents()).isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void testRetrieveData() throws Exception {
// Then
then(dwcaRepository).should(times(2)).createTable(anyString());
then(dwcaRepository).should(times(2)).postRecords(anyString(), anyList());
then(kafkaService).should(times(9)).sendMessage(eq("digital-specimen"), any(
then(kafkaService).should(times(9)).sendMessage(any(
DigitalSpecimenEvent.class));
assertThat(captor.getValue().get("eml:license").asText()).isEqualTo(
"http://creativecommons.org/licenses/by-nc/4.0/legalcode");
Expand Down Expand Up @@ -160,7 +160,7 @@ void testRetrieveDataEmlException() throws Exception {
// Then
then(dwcaRepository).should(times(2)).createTable(anyString());
then(dwcaRepository).should(times(2)).postRecords(anyString(), anyList());
then(kafkaService).should(times(9)).sendMessage(eq("digital-specimen"), any(
then(kafkaService).should(times(9)).sendMessage( any(
DigitalSpecimenEvent.class));
assertThat(captor.getValue().get("eml:license")).isNull();
assertThat(captor.getValue().get("eml:title")).isNull();
Expand All @@ -183,7 +183,7 @@ void testRetrieveDataWithLicenseText() throws Exception {
// Then
then(dwcaRepository).should(times(2)).createTable(anyString());
then(dwcaRepository).should(times(2)).postRecords(anyString(), anyList());
then(kafkaService).should(times(9)).sendMessage(eq("digital-specimen"), any(
then(kafkaService).should(times(9)).sendMessage(any(
DigitalSpecimenEvent.class));
assertThat(captor.getValue().get("eml:license").asText()).isEqualTo(
"Creative Commons Attribution Non Commercial (CC-BY-NC) 4.0 License");
Expand Down Expand Up @@ -246,7 +246,7 @@ void testRetrieveDataWithGbifMedia() throws Exception {
// Then
then(dwcaRepository).should(times(3)).createTable(anyString());
then(dwcaRepository).should(times(2)).postRecords(anyString(), anyList());
then(kafkaService).should(times(19)).sendMessage(eq("digital-specimen"), any(
then(kafkaService).should(times(19)).sendMessage(any(
DigitalSpecimenEvent.class));
cleanup("src/test/resources/dwca/test/dwca-kew-gbif-media.zip");
}
Expand Down Expand Up @@ -282,7 +282,7 @@ void testRetrieveDataWithAcMedia() throws Exception {
// Then
then(dwcaRepository).should(times(2)).createTable(anyString());
then(dwcaRepository).should(times(2)).postRecords(anyString(), anyList());
then(kafkaService).should(times(14)).sendMessage(eq("digital-specimen"), any(
then(kafkaService).should(times(14)).sendMessage(any(
DigitalSpecimenEvent.class));
cleanup("src/test/resources/dwca/test/dwca-naturalis-ac-media.zip");
}
Expand All @@ -307,7 +307,7 @@ void testRetrieveDataWithInvalidAcMedia() throws Exception {
var captor = ArgumentCaptor.forClass(DigitalSpecimenEvent.class);
then(dwcaRepository).should(times(2)).createTable(anyString());
then(dwcaRepository).should(times(2)).postRecords(anyString(), anyList());
then(kafkaService).should(times(1)).sendMessage(eq("digital-specimen"), captor.capture());
then(kafkaService).should(times(1)).sendMessage(captor.capture());
assertThat(captor.getValue().digitalMediaObjectEvents()).isEmpty();
cleanup("src/test/resources/dwca/test/dwca-invalid-ac-media.zip");
}
Expand Down Expand Up @@ -346,7 +346,7 @@ void testRetrieveDataWithAssociatedMedia() throws Exception {
// Then
then(dwcaRepository).should(times(1)).createTable(anyString());
then(dwcaRepository).should(times(1)).postRecords(anyString(), anyList());
then(kafkaService).should(times(20)).sendMessage(eq("digital-specimen"), any(
then(kafkaService).should(times(20)).sendMessage(any(
DigitalSpecimenEvent.class));
cleanup("src/test/resources/dwca/test/dwca-lux-associated-media.zip");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import eu.dissco.core.translator.domain.DigitalMediaObjectEvent;
import eu.dissco.core.translator.domain.DigitalSpecimenEvent;
import eu.dissco.core.translator.domain.DigitalSpecimenWrapper;
import eu.dissco.core.translator.properties.KafkaProperties;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -30,22 +31,25 @@ class KafkaServiceTest {
private KafkaTemplate<String, String> kafkaTemplate;
@Mock
private SendResult<String, String> sendResult;
@Mock
private KafkaProperties properties;
private KafkaService service;

@BeforeEach
void setup() {
this.service = new KafkaService(kafkaTemplate, MAPPER);
this.service = new KafkaService(kafkaTemplate, properties, MAPPER);
}

@Test
void testSendMessage() throws JsonProcessingException {
// Given
var x = CompletableFuture.completedFuture(sendResult);
given(kafkaTemplate.send(anyString(), anyString())).willReturn(x);
given(properties.getTopic()).willReturn("test-topic");
var digitalSpecimenEvent = givenDigitalSpecimenEvent();

// When
service.sendMessage("test-topic", digitalSpecimenEvent);
service.sendMessage(digitalSpecimenEvent);

// Then
then(kafkaTemplate).should()
Expand Down

0 comments on commit f7f1c52

Please sign in to comment.