diff --git a/README.md b/README.md index 624853b..fcfa643 100644 --- a/README.md +++ b/README.md @@ -29,3 +29,30 @@ java -jar target/.jar # oder mvn spring-boot:run ``` + +# Customer Events über Kafka + +Producer und Consumer benötigen eine wohldefinierte Schnittstelle. Diese wird hier dokumentiert. + +- Name des Topics: `customer-events` +- Aufteilung der Nachrichten (Partitionierung) nach der UUID des Kunden + (Events für denselben Kunden müssen in derselben Partition landen) + - Message Key: `UUID` + - Format: `String` +- Customer-Daten sowie die Art des Events als Payload + - Message Payload: `CustomerEventRecord` + - Format: `JSON` + +Hier ein Beispiel für den Payload: + +```json +{ + "event_type": "created|replaced|deleted", + "uuid": "12345", + "customer": { + "name": "Tom Mayer", + "birthdate": "2002-10-05", + "state": "active|locked|disabled" + } +} +``` \ No newline at end of file diff --git a/account-service-provider/pom.xml b/account-service-provider/pom.xml index 48d9504..2b2379b 100644 --- a/account-service-provider/pom.xml +++ b/account-service-provider/pom.xml @@ -55,6 +55,15 @@ h2 runtime + + org.springframework.kafka + spring-kafka + + + org.springframework.kafka + spring-kafka-test + test + org.springframework.boot spring-boot-starter-test diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomJsonSerializer.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomJsonSerializer.java new file mode 100644 index 0000000..ea51a17 --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomJsonSerializer.java @@ -0,0 +1,23 @@ +package de.sample.schulung.accounts.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.kafka.support.JacksonUtils; +import org.springframework.kafka.support.serializer.JsonSerializer; + +public class CustomJsonSerializer extends JsonSerializer { + + private static ObjectMapper createCustomObjectMapper() { + final var result = JacksonUtils.enhancedObjectMapper(); + result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + result.registerModule(new JavaTimeModule()); + return result; + } + + public CustomJsonSerializer() { + super(createCustomObjectMapper()); + } +} \ No newline at end of file diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventRecord.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventRecord.java new file mode 100644 index 0000000..4853584 --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventRecord.java @@ -0,0 +1,10 @@ +package de.sample.schulung.accounts.kafka; + +import java.util.UUID; + +public record CustomerEventRecord( + String eventType, + UUID uuid, + CustomerRecord customer +) { +} diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventsProducer.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventsProducer.java new file mode 100644 index 0000000..569f64f --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventsProducer.java @@ -0,0 +1,42 @@ +package de.sample.schulung.accounts.kafka; + +import de.sample.schulung.accounts.domain.events.CustomerCreatedEvent; +import lombok.RequiredArgsConstructor; +import org.springframework.context.event.EventListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Component +@RequiredArgsConstructor +public class CustomerEventsProducer { + + private final KafkaTemplate kafkaTemplate; + + @EventListener + public void handleCustomerCreatedEvent(CustomerCreatedEvent event) { + // map event to record + var customer = event.customer(); + var payload = new CustomerEventRecord( + "created", + customer.getUuid(), + new CustomerRecord( + customer.getName(), + customer.getDateOfBirth(), + switch(customer.getState()) { + case ACTIVE -> "active"; + case LOCKED -> "locked"; + case DISABLED -> "disabled"; + } + ) + ); + // send message + kafkaTemplate.send( + KafkaConstants.CUSTOMER_EVENTS_TOPIC, + event.customer().getUuid(), + payload + ); + } + +} diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerRecord.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerRecord.java new file mode 100644 index 0000000..a139a3c --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerRecord.java @@ -0,0 +1,10 @@ +package de.sample.schulung.accounts.kafka; + +import java.time.LocalDate; + +public record CustomerRecord( + String name, + LocalDate birthdate, + String state +) { +} diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConfiguration.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConfiguration.java new file mode 100644 index 0000000..70fceda --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConfiguration.java @@ -0,0 +1,34 @@ +package de.sample.schulung.accounts.kafka; + +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.support.JacksonUtils; + +@Configuration +public class KafkaConfiguration { + + // Topic beim Start der Anwendung erzeugen + // -> nur für lokale Tests, NICHT für Produktion + // siehe application.yml: spring.kafka.admin.auto-create + @Bean + public NewTopic customerEventsTopic() { + return TopicBuilder + .name(KafkaConstants.CUSTOMER_EVENTS_TOPIC) + // .partitions(3) + // .replicas(3) + .build(); + } + + @EventListener(ContextRefreshedEvent.class) + public void configureJsonSerializer() { + JacksonUtils + .enhancedObjectMapper() + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + } + +} diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConstants.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConstants.java new file mode 100644 index 0000000..f5e30a7 --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/KafkaConstants.java @@ -0,0 +1,10 @@ +package de.sample.schulung.accounts.kafka; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class KafkaConstants { + + public final String CUSTOMER_EVENTS_TOPIC = "customer-events"; + +} diff --git a/account-service-provider/src/main/resources/application.yml b/account-service-provider/src/main/resources/application.yml index ad33ce8..dd84426 100644 --- a/account-service-provider/src/main/resources/application.yml +++ b/account-service-provider/src/main/resources/application.yml @@ -16,6 +16,17 @@ spring: console: path: /db enabled: true + kafka: + admin: + auto-create: true + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + producer: + key-serializer: org.apache.kafka.common.serialization.UUIDSerializer + # https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#json-serde (Jackson) + value-serializer: de.sample.schulung.accounts.kafka.CustomJsonSerializer + properties: + # do not serialize the class name into the message + "[spring.json.add.type.headers]": false application: customers: initialization: diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java index 737c86e..ca970ed 100644 --- a/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java +++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java @@ -1,5 +1,6 @@ package de.sample.schulung.accounts; +import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -18,6 +19,7 @@ @SpringBootTest @AutoConfigureMockMvc @AutoConfigureTestDatabase +@AutoConfigureKafkaTemplateMock class AccountsApiTests { @Autowired diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java index 259c02c..8ad13da 100644 --- a/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java +++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java @@ -1,5 +1,6 @@ package de.sample.schulung.accounts; +import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase; @@ -15,6 +16,7 @@ @SpringBootTest @AutoConfigureMockMvc @AutoConfigureTestDatabase +@AutoConfigureKafkaTemplateMock public class IndexPageTests { @Autowired diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java index af00809..8d9b5ac 100644 --- a/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java +++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java @@ -2,6 +2,7 @@ import de.sample.schulung.accounts.domain.CustomersService; import de.sample.schulung.accounts.domain.NotFoundException; +import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase; @@ -28,6 +29,7 @@ ) @AutoConfigureMockMvc @AutoConfigureTestDatabase +@AutoConfigureKafkaTemplateMock public class AccountsBoundaryTests { @Autowired diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersInitializerTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersInitializerTests.java index 953e204..d4d57d5 100644 --- a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersInitializerTests.java +++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersInitializerTests.java @@ -1,5 +1,6 @@ package de.sample.schulung.accounts.domain; +import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock; import org.junit.jupiter.api.Test; import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase; import org.springframework.boot.test.context.SpringBootTest; @@ -15,6 +16,7 @@ } ) @AutoConfigureTestDatabase +@AutoConfigureKafkaTemplateMock public class CustomersInitializerTests { @MockBean diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java index 3e7be1c..b1cedd6 100644 --- a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java +++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java @@ -1,5 +1,6 @@ package de.sample.schulung.accounts.domain; +import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase; @@ -12,6 +13,7 @@ @SpringBootTest @AutoConfigureTestDatabase +@AutoConfigureKafkaTemplateMock public class CustomersServiceTest { @Autowired diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureKafkaTemplateMock.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureKafkaTemplateMock.java new file mode 100644 index 0000000..d7b1242 --- /dev/null +++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureKafkaTemplateMock.java @@ -0,0 +1,26 @@ +package de.sample.schulung.accounts.kafka; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.kafka.core.KafkaTemplate; + +import java.lang.annotation.*; + +/** + * Auto-configures a {@link KafkaTemplate} mock in the test context. + * You can get the mock injected by simply using + *
+ * \u0040Autowired
+ * KafkaTemplate<String, CustomerDto> templateMock;
+ * 
+ */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@EnableAutoConfiguration(exclude = KafkaAutoConfiguration.class) +@MockBean(KafkaTemplate.class) +public @interface AutoConfigureKafkaTemplateMock { + +}