Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Add test for producer. #8

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;

import de.sample.schulung.accounts.domain.AutoConfigureDisabledInitialization;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
Expand All @@ -20,6 +21,7 @@
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
@AutoConfigureDisabledInitialization
class AccountsApiTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;

import de.sample.schulung.accounts.domain.AutoConfigureDisabledInitialization;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -17,6 +18,7 @@
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
@AutoConfigureDisabledInitialization
public class IndexPageTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts.boundary;

import de.sample.schulung.accounts.domain.AutoConfigureDisabledInitialization;
import de.sample.schulung.accounts.domain.CustomersService;
import de.sample.schulung.accounts.domain.NotFoundException;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
Expand Down Expand Up @@ -30,6 +31,7 @@
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
@AutoConfigureDisabledInitialization
public class AccountsBoundaryTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.sample.schulung.accounts.domain;

import org.springframework.test.context.TestPropertySource;

import java.lang.annotation.*;

@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@TestPropertySource(
properties = """
application.customers.initialization.enabled=false
"""
)
public @interface AutoConfigureDisabledInitialization {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.sample.schulung.accounts.domain;

import org.springframework.test.context.TestPropertySource;

import java.lang.annotation.*;

@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@TestPropertySource(
properties = """
application.customers.initialization.enabled=true
"""
)
public @interface AutoConfigureEnabledInitialization {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
@AutoConfigureEnabledInitialization
public class CustomersInitializerTests {

@MockBean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@SpringBootTest
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
@AutoConfigureDisabledInitialization
public class CustomersServiceTest {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package de.sample.schulung.accounts.kafka;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.stereotype.Component;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.lang.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Auto-configures an {@link EmbeddedKafka}
* and provides an extension to run, reset and stop the container.<br/>
* We can get the following beans injected into our test class
* <pre>
* \u0040Autowired
* EmbeddedKafkaBroker kafka;
* \u0040Autowired
* KafkaTestContext&lt;Key,Value&gt; context;
* </pre>
*/
@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
// Kafka Configuration
@EmbeddedKafka
@TestPropertySource(
properties = """
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
# we disable this in production, but need this for the tests
spring.kafka.producer.properties."[spring.json.add.type.headers]"=true
"""
)
@Import({
AutoConfigureEmbeddedKafka.EmbeddedKafkaConfiguration.class,
AutoConfigureEmbeddedKafka.KafkaMessageListenerContainerLifecycleHandler.class
})
@ExtendWith(AutoConfigureEmbeddedKafka.EmbeddedKafkaExtension.class)
public @interface AutoConfigureEmbeddedKafka {

@RequiredArgsConstructor
@Getter(AccessLevel.PRIVATE)
class KafkaTestContext<K, V> {

private final BlockingQueue<ConsumerRecord<K, V>> records;
private final KafkaMessageListenerContainer<K, V> container;

@SneakyThrows
public Optional<ConsumerRecord<K, V>> poll(long timeout, TimeUnit unit) {
return Optional.ofNullable(this.records.poll(timeout, unit));
}

}


@TestConfiguration
class EmbeddedKafkaConfiguration {

@Bean
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
KafkaTestContext<?, ?> createKafkaTestContext(EmbeddedKafkaBroker kafka, List<NewTopic> topics) {
final var consumerFactory = new DefaultKafkaConsumerFactory<>(
Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokersAsString(),
ConsumerConfig.GROUP_ID_CONFIG, "consumer",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000",
// not needed, but must not be null
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, CustomJsonDeserializer.class.getName(),
JsonDeserializer.TRUSTED_PACKAGES, "*",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
)
);
final var containerProperties = new ContainerProperties(
topics
.stream()
.map(NewTopic::name)
.toArray(String[]::new)
);
final var container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
final var records = new LinkedBlockingQueue<ConsumerRecord<Object, Object>>();
container.setupMessageListener((MessageListener<?, ?>) records::add);
return new KafkaTestContext<>(records, container);
}

}

@Component
@RequiredArgsConstructor
class KafkaMessageListenerContainerLifecycleHandler {

private final KafkaTestContext<?, ?> context;
private final EmbeddedKafkaBroker kafka;

@EventListener(ContextRefreshedEvent.class)
public void startup() {
context.getContainer().start();
ContainerTestUtils.waitForAssignment(context.getContainer(), kafka.getPartitionsPerTopic());
}

@EventListener(ContextClosedEvent.class)
public void shutdown() {
context.getContainer().stop();
}

}

// we need to reset the records between the tests
class EmbeddedKafkaExtension implements AfterEachCallback {

@Override
public void afterEach(ExtensionContext context) {
SpringExtension
.getApplicationContext(context)
.getBean(KafkaTestContext.class)
.getRecords()
.clear();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.sample.schulung.accounts.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonDeserializer;

public class CustomJsonDeserializer extends JsonDeserializer<Object> {

private static ObjectMapper createCustomObjectMapper() {
final var result = JacksonUtils.enhancedObjectMapper();
result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
result.registerModule(new JavaTimeModule());
return result;
}

public CustomJsonDeserializer() {
super(createCustomObjectMapper());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package de.sample.schulung.accounts.kafka;

import de.sample.schulung.accounts.domain.AutoConfigureDisabledInitialization;
import de.sample.schulung.accounts.domain.Customer;
import de.sample.schulung.accounts.domain.CustomersService;
import de.sample.schulung.accounts.kafka.AutoConfigureEmbeddedKafka.KafkaTestContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDate;
import java.time.Month;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.from;

@SpringBootTest
@AutoConfigureTestDatabase
@AutoConfigureEmbeddedKafka
@AutoConfigureDisabledInitialization
class CustomerEventsProducerTests {

@Autowired
CustomersService service;
@Autowired
KafkaTestContext<UUID, CustomerEventRecord> context;


@Test
void shouldProduceCustomerEventWhenCustomerIsCreated() {
var customer = new Customer();
customer.setName("Tom Mayer");
customer.setState(Customer.CustomerState.ACTIVE);
customer.setDateOfBirth(LocalDate.of(2000, Month.DECEMBER, 20));

service.createCustomer(customer);

assertThat(context.poll(3, TimeUnit.SECONDS))
.isPresent()
.get()
.returns(KafkaConstants.CUSTOMER_EVENTS_TOPIC, from(ConsumerRecord::topic))
.extracting(ConsumerRecord::value)
.returns("created", from(CustomerEventRecord::eventType))
.returns(customer.getUuid(), from(CustomerEventRecord::uuid))
.extracting(CustomerEventRecord::customer)
.returns("Tom Mayer", from(CustomerRecord::name));

}

}
4 changes: 2 additions & 2 deletions sample-requests/POST-customers.http
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Content-Type: application/json
Accept: application/json

{
"name": "Tom Mayer",
"birthdate": "1985-07-30",
"name": "Julia Schmidt",
"birthdate": "1996-01-10",
"state": "active"
}

Expand Down
Loading