From ab3c97fde144b1fe9b73f160474ebb7539f52455 Mon Sep 17 00:00:00 2001 From: Ralf Ueberfuhr Date: Thu, 27 Jun 2024 17:04:06 +0200 Subject: [PATCH] Introduce @KafkaProducer interceptor. --- .../kafka/CustomerEventsProducer.java | 37 +++++------- .../kafka/interceptor/KafkaProducer.java | 31 ++++++++++ .../interceptor/KafkaProducerInterceptor.java | 57 +++++++++++++++++++ .../kafka/interceptor/KafkaRecord.java | 12 ++++ 4 files changed, 114 insertions(+), 23 deletions(-) create mode 100644 account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaProducer.java create mode 100644 account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaProducerInterceptor.java create mode 100644 account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaRecord.java diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventsProducer.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventsProducer.java index 44664e7..cd6e342 100644 --- a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventsProducer.java +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/CustomerEventsProducer.java @@ -3,9 +3,10 @@ import de.sample.schulung.accounts.domain.events.CustomerCreatedEvent; import de.sample.schulung.accounts.domain.events.CustomerDeletedEvent; import de.sample.schulung.accounts.domain.events.CustomerReplacedEvent; +import de.sample.schulung.accounts.kafka.interceptor.KafkaProducer; +import de.sample.schulung.accounts.kafka.interceptor.KafkaRecord; import lombok.RequiredArgsConstructor; import org.springframework.context.event.EventListener; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.util.UUID; @@ -14,42 +15,32 @@ @RequiredArgsConstructor public class CustomerEventsProducer { - private final KafkaTemplate kafkaTemplate; private final CustomerEventRecordMapper mapper; @EventListener - public void handleCustomerCreatedEvent(CustomerCreatedEvent event) { - // map event to record - var payload = this.mapper.map(event); - // send message - kafkaTemplate.send( - KafkaConstants.CUSTOMER_EVENTS_TOPIC, + @KafkaProducer(topic = KafkaConstants.CUSTOMER_EVENTS_TOPIC) + public KafkaRecord handleCustomerCreatedEvent(CustomerCreatedEvent event) { + return new KafkaRecord<>( event.customer().getUuid(), - payload + this.mapper.map(event) ); } @EventListener - public void handleCustomerReplacedEvent(CustomerReplacedEvent event) { - // map event to record - var payload = this.mapper.map(event); - // send message - kafkaTemplate.send( - KafkaConstants.CUSTOMER_EVENTS_TOPIC, + @KafkaProducer(topic = KafkaConstants.CUSTOMER_EVENTS_TOPIC) + public KafkaRecord handleCustomerReplacedEvent(CustomerReplacedEvent event) { + return new KafkaRecord<>( event.customer().getUuid(), - payload + this.mapper.map(event) ); } @EventListener - public void handleCustomerDeletedEvent(CustomerDeletedEvent event) { - // map event to record - var payload = this.mapper.map(event); - // send message - kafkaTemplate.send( - KafkaConstants.CUSTOMER_EVENTS_TOPIC, + @KafkaProducer(topic = KafkaConstants.CUSTOMER_EVENTS_TOPIC) + public KafkaRecord handleCustomerDeletedEvent(CustomerDeletedEvent event) { + return new KafkaRecord<>( event.uuid(), - payload + this.mapper.map(event) ); } diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaProducer.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaProducer.java new file mode 100644 index 0000000..addaef2 --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaProducer.java @@ -0,0 +1,31 @@ +package de.sample.schulung.accounts.kafka.interceptor; + +import java.lang.annotation.*; + +/** + * Annotate a method to get the return value + * sent to a Kafka topic. The method can return a simple object that is then sent as a value + * without a key, or it can return an instance of {@link KafkaRecord}. + * If the method returns null (or has a void return type), no message is produced. + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +public @interface KafkaProducer { + + /** + * The name of the topic. + * + * @return the name of the topic + */ + String topic(); + + /** + * The partition. Leave empty, if the Partitioner should do the job. + * + * @return the partition + */ + int partition() default -1; + +} \ No newline at end of file diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaProducerInterceptor.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaProducerInterceptor.java new file mode 100644 index 0000000..79f84ee --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaProducerInterceptor.java @@ -0,0 +1,57 @@ +package de.sample.schulung.accounts.kafka.interceptor; + +import org.aopalliance.intercept.MethodInterceptor; +import org.springframework.aop.Pointcut; +import org.springframework.aop.framework.autoproxy.AbstractBeanFactoryAwareAdvisingPostProcessor; +import org.springframework.aop.support.DefaultPointcutAdvisor; +import org.springframework.aop.support.annotation.AnnotationMatchingPointcut; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +@SuppressWarnings("unused") +public class KafkaProducerInterceptor + extends AbstractBeanFactoryAwareAdvisingPostProcessor + implements InitializingBean { + + private final MethodInterceptor advice; + + public KafkaProducerInterceptor(final KafkaTemplate kafkaTemplate) { + this.advice = invocation -> { + final var result = invocation.proceed(); + if (result == null) { + return null; + } + final var annotation = AnnotationUtils.findAnnotation(invocation.getMethod(), KafkaProducer.class); + if (annotation != null) { + final Object key; + final Object value; + if (result instanceof KafkaRecord r) { + key = r.key(); + value = r.value(); + } else { + key = null; + value = result; + } + //noinspection DataFlowIssue + kafkaTemplate.send( + annotation.topic(), + annotation.partition() < 0 ? null : annotation.partition(), + key, + value + ); + } + return result; + }; + } + + @Override + public void afterPropertiesSet() { + Pointcut pointcut = new AnnotationMatchingPointcut(null, KafkaProducer.class, true); + this.advisor = new DefaultPointcutAdvisor(pointcut, advice); + } + + +} \ No newline at end of file diff --git a/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaRecord.java b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaRecord.java new file mode 100644 index 0000000..e9af691 --- /dev/null +++ b/account-service-provider/src/main/java/de/sample/schulung/accounts/kafka/interceptor/KafkaRecord.java @@ -0,0 +1,12 @@ +package de.sample.schulung.accounts.kafka.interceptor; + +/** + * Return a KafkaRecord from a {@link KafkaProducer} method + * to get a key and a value sent. + */ +public record KafkaRecord( + K key, + V value +) { + +}