Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
Introduce @KafkaProducer interceptor.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralf Ueberfuhr committed Jun 27, 2024
1 parent f548945 commit ab3c97f
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import de.sample.schulung.accounts.domain.events.CustomerCreatedEvent;
import de.sample.schulung.accounts.domain.events.CustomerDeletedEvent;
import de.sample.schulung.accounts.domain.events.CustomerReplacedEvent;
import de.sample.schulung.accounts.kafka.interceptor.KafkaProducer;
import de.sample.schulung.accounts.kafka.interceptor.KafkaRecord;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.UUID;
Expand All @@ -14,42 +15,32 @@
@RequiredArgsConstructor
public class CustomerEventsProducer {

private final KafkaTemplate<UUID, Object> kafkaTemplate;
private final CustomerEventRecordMapper mapper;

@EventListener
public void handleCustomerCreatedEvent(CustomerCreatedEvent event) {
// map event to record
var payload = this.mapper.map(event);
// send message
kafkaTemplate.send(
KafkaConstants.CUSTOMER_EVENTS_TOPIC,
@KafkaProducer(topic = KafkaConstants.CUSTOMER_EVENTS_TOPIC)
public KafkaRecord<UUID, CustomerEventRecord> handleCustomerCreatedEvent(CustomerCreatedEvent event) {
return new KafkaRecord<>(
event.customer().getUuid(),
payload
this.mapper.map(event)
);
}

@EventListener
public void handleCustomerReplacedEvent(CustomerReplacedEvent event) {
// map event to record
var payload = this.mapper.map(event);
// send message
kafkaTemplate.send(
KafkaConstants.CUSTOMER_EVENTS_TOPIC,
@KafkaProducer(topic = KafkaConstants.CUSTOMER_EVENTS_TOPIC)
public KafkaRecord<UUID, CustomerEventRecord> handleCustomerReplacedEvent(CustomerReplacedEvent event) {
return new KafkaRecord<>(
event.customer().getUuid(),
payload
this.mapper.map(event)
);
}

@EventListener
public void handleCustomerDeletedEvent(CustomerDeletedEvent event) {
// map event to record
var payload = this.mapper.map(event);
// send message
kafkaTemplate.send(
KafkaConstants.CUSTOMER_EVENTS_TOPIC,
@KafkaProducer(topic = KafkaConstants.CUSTOMER_EVENTS_TOPIC)
public KafkaRecord<UUID, CustomerEventRecord> handleCustomerDeletedEvent(CustomerDeletedEvent event) {
return new KafkaRecord<>(
event.uuid(),
payload
this.mapper.map(event)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package de.sample.schulung.accounts.kafka.interceptor;

import java.lang.annotation.*;

/**
* Annotate a method to get the return value
* sent to a Kafka topic. The method can return a simple object that is then sent as a value
* without a key, or it can return an instance of {@link KafkaRecord}.
* If the method returns <tt>null</tt> (or has a void return type), no message is produced.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface KafkaProducer {

/**
* The name of the topic.
*
* @return the name of the topic
*/
String topic();

/**
* The partition. Leave empty, if the Partitioner should do the job.
*
* @return the partition
*/
int partition() default -1;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package de.sample.schulung.accounts.kafka.interceptor;

import org.aopalliance.intercept.MethodInterceptor;
import org.springframework.aop.Pointcut;
import org.springframework.aop.framework.autoproxy.AbstractBeanFactoryAwareAdvisingPostProcessor;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.aop.support.annotation.AnnotationMatchingPointcut;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("unused")
public class KafkaProducerInterceptor
extends AbstractBeanFactoryAwareAdvisingPostProcessor
implements InitializingBean {

private final MethodInterceptor advice;

public KafkaProducerInterceptor(final KafkaTemplate<Object, Object> kafkaTemplate) {
this.advice = invocation -> {
final var result = invocation.proceed();
if (result == null) {
return null;
}
final var annotation = AnnotationUtils.findAnnotation(invocation.getMethod(), KafkaProducer.class);
if (annotation != null) {
final Object key;
final Object value;
if (result instanceof KafkaRecord<?, ?> r) {
key = r.key();
value = r.value();
} else {
key = null;
value = result;
}
//noinspection DataFlowIssue
kafkaTemplate.send(
annotation.topic(),
annotation.partition() < 0 ? null : annotation.partition(),
key,
value
);
}
return result;
};
}

@Override
public void afterPropertiesSet() {
Pointcut pointcut = new AnnotationMatchingPointcut(null, KafkaProducer.class, true);
this.advisor = new DefaultPointcutAdvisor(pointcut, advice);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package de.sample.schulung.accounts.kafka.interceptor;

/**
* Return a KafkaRecord from a {@link KafkaProducer} method
* to get a key and a value sent.
*/
public record KafkaRecord<K, V>(
K key,
V value
) {

}

0 comments on commit ab3c97f

Please sign in to comment.