diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java
index ca970ed..908b335 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/AccountsApiTests.java
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;
+import de.sample.schulung.accounts.domain.AutoConfigureDisabledInitialization;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
@@ -20,6 +21,7 @@
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
+@AutoConfigureDisabledInitialization
class AccountsApiTests {
@Autowired
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java
index 8ad13da..2883369 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/IndexPageTests.java
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts;
+import de.sample.schulung.accounts.domain.AutoConfigureDisabledInitialization;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -17,6 +18,7 @@
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
+@AutoConfigureDisabledInitialization
public class IndexPageTests {
@Autowired
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java
index 8d9b5ac..9ccfa7e 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/boundary/AccountsBoundaryTests.java
@@ -1,5 +1,6 @@
package de.sample.schulung.accounts.boundary;
+import de.sample.schulung.accounts.domain.AutoConfigureDisabledInitialization;
import de.sample.schulung.accounts.domain.CustomersService;
import de.sample.schulung.accounts.domain.NotFoundException;
import de.sample.schulung.accounts.kafka.AutoConfigureKafkaTemplateMock;
@@ -30,6 +31,7 @@
@AutoConfigureMockMvc
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
+@AutoConfigureDisabledInitialization
public class AccountsBoundaryTests {
@Autowired
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/AutoConfigureDisabledInitialization.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/AutoConfigureDisabledInitialization.java
new file mode 100644
index 0000000..62ccb21
--- /dev/null
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/AutoConfigureDisabledInitialization.java
@@ -0,0 +1,17 @@
+package de.sample.schulung.accounts.domain;
+
+import org.springframework.test.context.TestPropertySource;
+
+import java.lang.annotation.*;
+
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@TestPropertySource(
+ properties = """
+ application.customers.initialization.enabled=false
+ """
+)
+public @interface AutoConfigureDisabledInitialization {
+}
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/AutoConfigureEnabledInitialization.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/AutoConfigureEnabledInitialization.java
new file mode 100644
index 0000000..d1c1ef4
--- /dev/null
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/AutoConfigureEnabledInitialization.java
@@ -0,0 +1,17 @@
+package de.sample.schulung.accounts.domain;
+
+import org.springframework.test.context.TestPropertySource;
+
+import java.lang.annotation.*;
+
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@TestPropertySource(
+ properties = """
+ application.customers.initialization.enabled=true
+ """
+)
+public @interface AutoConfigureEnabledInitialization {
+}
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersInitializerTests.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersInitializerTests.java
index d4d57d5..8e52d31 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersInitializerTests.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersInitializerTests.java
@@ -17,6 +17,7 @@
)
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
+@AutoConfigureEnabledInitialization
public class CustomersInitializerTests {
@MockBean
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java
index b1cedd6..17a8674 100644
--- a/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/domain/CustomersServiceTest.java
@@ -14,6 +14,7 @@
@SpringBootTest
@AutoConfigureTestDatabase
@AutoConfigureKafkaTemplateMock
+@AutoConfigureDisabledInitialization
public class CustomersServiceTest {
@Autowired
diff --git a/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureEmbeddedKafka.java b/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureEmbeddedKafka.java
new file mode 100644
index 0000000..a4ea366
--- /dev/null
+++ b/account-service-provider/src/test/java/de/sample/schulung/accounts/kafka/AutoConfigureEmbeddedKafka.java
@@ -0,0 +1,155 @@
+package de.sample.schulung.accounts.kafka;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.UUIDDeserializer;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.kafka.listener.MessageListener;
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.kafka.test.utils.ContainerTestUtils;
+import org.springframework.stereotype.Component;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.lang.annotation.*;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Auto-configures an {@link EmbeddedKafka}
+ * and provides an extension to run, reset and stop the container.
+ * We can get the following beans injected into our test class
+ *
+ * \u0040Autowired + * EmbeddedKafkaBroker kafka; + * \u0040Autowired + * KafkaTestContext<Key,Value> context; + *+ */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +// Kafka Configuration +@EmbeddedKafka +@TestPropertySource( + properties = """ + spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers} + # we disable this in production, but need this for the tests + spring.kafka.producer.properties."[spring.json.add.type.headers]"=true + """ +) +@Import({ + AutoConfigureEmbeddedKafka.EmbeddedKafkaConfiguration.class, + AutoConfigureEmbeddedKafka.KafkaMessageListenerContainerLifecycleHandler.class +}) +@ExtendWith(AutoConfigureEmbeddedKafka.EmbeddedKafkaExtension.class) +public @interface AutoConfigureEmbeddedKafka { + + @RequiredArgsConstructor + @Getter(AccessLevel.PRIVATE) + class KafkaTestContext