diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 488190d920..17704018ed 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.serialization.StringDeserializer; @@ -82,6 +83,8 @@ @SuppressWarnings("deprecation") @DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class) public class KafkaSource implements Source> { + int MAX_RETRY_NEW_CONSUMER_ATTEMPTS = 3; + long RETRY_SLEEP_INTERVAL = 30000; private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private final KafkaSourceConfig sourceConfig; private AtomicBoolean shutdownInProgress; @@ -130,22 +133,30 @@ public void start(Buffer> buffer) { allTopicExecutorServices.add(executorService); IntStream.range(0, numWorkers).forEach(index -> { - switch (schema) { - case JSON: - kafkaConsumer = new KafkaConsumer(consumerProperties); + int retryCount = 0; + while (true) { + try { + kafkaConsumer = createKafkaConsumer(schema, consumerProperties); break; - case AVRO: - kafkaConsumer = new KafkaConsumer(consumerProperties); - break; - case PLAINTEXT: - default: - glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig); - if (Objects.nonNull(glueDeserializer)) { - kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); + } catch (ConfigException ce) { + if (ce.getMessage().contains("No resolvable bootstrap urls given in bootstrap.servers")) { + retryCount++; + if (retryCount >= MAX_RETRY_NEW_CONSUMER_ATTEMPTS) { + LOG.error("Exceeded maximum retry attempts to setup the Kafka Consumer. Giving up."); + throw new RuntimeException(ce); + } + LOG.warn("Bootstrap URL could not be resolved. Retrying in " + RETRY_SLEEP_INTERVAL + "ms..."); + try { + sleep(RETRY_SLEEP_INTERVAL); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } } else { - kafkaConsumer = new KafkaConsumer(consumerProperties); + throw ce; } - break; + } + } consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); allTopicConsumers.add(consumer); @@ -165,6 +176,23 @@ public void start(Buffer> buffer) { }); } + public KafkaConsumer createKafkaConsumer(MessageFormat schema, Properties consumerProperties) { + switch (schema) { + case JSON: + return new KafkaConsumer(consumerProperties); + case AVRO: + return new KafkaConsumer(consumerProperties); + case PLAINTEXT: + default: + glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig); + if (Objects.nonNull(glueDeserializer)) { + return new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); + } else { + return new KafkaConsumer(consumerProperties); + } + } + } + @Override public void stop() { shutdownInProgress.set(true); @@ -485,4 +513,8 @@ private String getMaskedBootStrapDetails(String serverIP) { } return maskedString.append(serverIP.substring(maskedLength)).toString(); } + + protected void sleep(long millis) throws InterruptedException { + Thread.sleep(millis); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index 05843ed1a9..4608d33fd0 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.kafka.source; +import org.apache.kafka.common.config.ConfigException; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; @@ -25,7 +26,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -135,4 +142,30 @@ void test_kafkaSource_basicFunctionality() { kafkaSource.start(buffer); assertTrue(Objects.nonNull(kafkaSource.getConsumer())); } + + @Test + void test_kafkaSource_retry_consumer_create() throws InterruptedException { + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + kafkaSource = spy(createObjectUnderTest()); + doNothing().when(kafkaSource).sleep(anyLong()); + + doThrow(new ConfigException("No resolvable bootstrap urls given in bootstrap.servers")) + .doCallRealMethod() + .when(kafkaSource) + .createKafkaConsumer(any(), any()); + kafkaSource.start(buffer); + } + @Test + void test_kafkaSource_max_retries() throws InterruptedException { + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + kafkaSource = spy(createObjectUnderTest()); + doNothing().when(kafkaSource).sleep(anyLong()); + + doThrow(new ConfigException("No resolvable bootstrap urls given in bootstrap.servers")) + .when(kafkaSource) + .createKafkaConsumer(any(), any()); + Assertions.assertThrows(Exception.class, () -> kafkaSource.start(buffer)); + } }