Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3 from ralf-ueberfuhr-ars/feature/consumer
Browse files Browse the repository at this point in the history
Add Kafka producer.
  • Loading branch information
ralf-ueberfuhr-ars authored Jun 27, 2024
2 parents 5aa00e9 + 0f54aa4 commit 6fe01a5
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 0 deletions.
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,30 @@ java -jar target/<file>.jar
# oder
mvn spring-boot:run
```

# Customer Events über Kafka

Producer und Consumer benötigen eine wohldefinierte Schnittstelle. Diese wird hier dokumentiert.

- Name des Topics: `customer-events`
- Aufteilung der Nachrichten (Partitionierung) nach der UUID des Kunden
(Events für denselben Kunden müssen in derselben Partition landen)
- Message Key: `UUID`
- Format: `String`
- Customer-Daten sowie die Art des Events als Payload
- Message Payload: `CustomerEventRecord`
- Format: `JSON`

Hier ein Beispiel für den Payload:

```json
{
"event_type": "created|replaced|deleted",
"uuid": "12345",
"customer": {
"name": "Tom Mayer",
"birthdate": "2002-10-05",
"state": "active|locked|disabled"
}
}
```
9 changes: 9 additions & 0 deletions account-service-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.sample.schulung.accounts.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonSerializer;

public class CustomJsonSerializer extends JsonSerializer<Object> {

private static ObjectMapper createCustomObjectMapper() {
final var result = JacksonUtils.enhancedObjectMapper();
result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
result.registerModule(new JavaTimeModule());
return result;
}

public CustomJsonSerializer() {
super(createCustomObjectMapper());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.accounts.kafka;

import java.util.UUID;

public record CustomerEventRecord(
String eventType,
UUID uuid,
CustomerRecord customer
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package de.sample.schulung.accounts.kafka;

import de.sample.schulung.accounts.domain.events.CustomerCreatedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
@RequiredArgsConstructor
public class CustomerEventsProducer {

private final KafkaTemplate<UUID, Object> kafkaTemplate;

@EventListener
public void handleCustomerCreatedEvent(CustomerCreatedEvent event) {
// map event to record
var customer = event.customer();
var payload = new CustomerEventRecord(
"created",
customer.getUuid(),
new CustomerRecord(
customer.getName(),
customer.getDateOfBirth(),
switch(customer.getState()) {
case ACTIVE -> "active";
case LOCKED -> "locked";
case DISABLED -> "disabled";
}
)
);
// send message
kafkaTemplate.send(
KafkaConstants.CUSTOMER_EVENTS_TOPIC,
event.customer().getUuid(),
payload
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.accounts.kafka;

import java.time.LocalDate;

public record CustomerRecord(
String name,
LocalDate birthdate,
String state
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package de.sample.schulung.accounts.kafka;

import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.support.JacksonUtils;

@Configuration
public class KafkaConfiguration {

// Topic beim Start der Anwendung erzeugen
// -> nur für lokale Tests, NICHT für Produktion
// siehe application.yml: spring.kafka.admin.auto-create
@Bean
public NewTopic customerEventsTopic() {
return TopicBuilder
.name(KafkaConstants.CUSTOMER_EVENTS_TOPIC)
// .partitions(3)
// .replicas(3)
.build();
}

@EventListener(ContextRefreshedEvent.class)
public void configureJsonSerializer() {
JacksonUtils
.enhancedObjectMapper()
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.accounts.kafka;

import lombok.experimental.UtilityClass;

@UtilityClass
public class KafkaConstants {

public final String CUSTOMER_EVENTS_TOPIC = "customer-events";

}
11 changes: 11 additions & 0 deletions account-service-provider/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ spring:
console:
path: /db
enabled: true
kafka:
admin:
auto-create: true
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
# https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#json-serde (Jackson)
value-serializer: de.sample.schulung.accounts.kafka.CustomJsonSerializer
properties:
# do not serialize the class name into the message
"[spring.json.add.type.headers]": false
application:
customers:
initialization:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand All @@ -18,6 +19,7 @@
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
class AccountsApiTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -15,6 +16,7 @@
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class IndexPageTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import de.sample.schulung.accounts.domain.CustomersService;
import de.sample.schulung.accounts.domain.NotFoundException;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -28,6 +29,7 @@
)
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class AccountsBoundaryTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts.domain;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.context.SpringBootTest;
Expand All @@ -15,6 +16,7 @@
}
)
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class CustomersInitializerTests {

@MockBean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts.domain;

import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
Expand All @@ -12,6 +13,7 @@

@SpringBootTest
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
public class CustomersServiceTest {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package de.sample.schulung.accounts.kafka;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.kafka.core.KafkaTemplate;

import java.lang.annotation.*;

/**
* Auto-configures a {@link KafkaTemplate} mock in the test context.
* You can get the mock injected by simply using
* <pre>
* \u0040Autowired
* KafkaTemplate&lt;String, CustomerDto&gt; templateMock;
* </pre>
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@EnableAutoConfiguration(exclude = KafkaAutoConfiguration.class)
@MockBean(KafkaTemplate.class)
public @interface AutoConfigureKafkaTemplateMock {

}

0 comments on commit 6fe01a5

Please sign in to comment.