Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry to Kafka Consumer Create in source #3399

Merged

Conversation

JonahCalvo
Copy link
Contributor

@JonahCalvo JonahCalvo commented Sep 28, 2023

Description

Certain error was seen when Kafka Source was constructed using new MSK instance:

Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) ~[kafka-clients-7.4.0-ccs.jar:?]
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) ~[kafka-clients-7.4.0-ccs.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:731) ~[kafka-clients-7.4.0-ccs.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) ~[kafka-clients-7.4.0-ccs.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[kafka-clients-7.4.0-ccs.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626) ~[kafka-clients-7.4.0-ccs.jar:?]
    at org.opensearch.dataprepper.plugins.kafka.source.KafkaSource.lambda$start$0(KafkaSource.java:146) ~[kafka-plugins-2.4.0.jar:?]
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104) ~[?:?]
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:593) ~[?:?]
    at org.opensearch.dataprepper.plugins.kafka.source.KafkaSource.lambda$start$1(KafkaSource.java:132) ~[kafka-plugins-2.4.0.jar:?]
    at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
    at org.opensearch.dataprepper.plugins.kafka.source.KafkaSource.start(KafkaSource.java:122) ~[kafka-plugins-2.4.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.startSourceAndProcessors(Pipeline.java:210) ~[data-prepper-core-2.4.0.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$execute$2(Pipeline.java:251) ~[data-prepper-core-2.4.0.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    ... 2 more

Adding 3 retries of 30 seconds each to the Kafka consumer construction to account for Route53 propagation delays.

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@JonahCalvo JonahCalvo changed the title Add retry to Kafka Consumer Create in sink Add retry to Kafka Consumer Create in source Sep 28, 2023
graytaylor0
graytaylor0 previously approved these changes Sep 28, 2023
@@ -82,6 +83,8 @@
@SuppressWarnings("deprecation")
@DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class)
public class KafkaSource implements Source<Record<Event>> {
public static final String NO_RESOLVABLE_URLS_ERROR_MESSAGE = "No resolvable bootstrap urls given in bootstrap.servers";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed 👍 intellij auto generate got me

kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer);
} catch (ConfigException ce) {
if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) {
LOG.warn(ce.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to provide context with this error mesage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed 👍

} catch (ConfigException ce) {
if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) {
LOG.warn(ce.getMessage());
LOG.warn("Bootstrap URL could not be resolved. Retrying in " + RETRY_SLEEP_INTERVAL + "ms...");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It is better to inject variables to logs with LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed

graytaylor0
graytaylor0 previously approved these changes Sep 28, 2023
@asifsmohammed asifsmohammed merged commit 49bf461 into opensearch-project:main Sep 29, 2023
28 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Sep 29, 2023
Signed-off-by: Jonah Calvo <[email protected]>
(cherry picked from commit 49bf461)
dlvenable pushed a commit that referenced this pull request Oct 5, 2023
Signed-off-by: Jonah Calvo <[email protected]>
(cherry picked from commit 49bf461)

Co-authored-by: Jonah Calvo <[email protected]>
@JonahCalvo JonahCalvo deleted the retry_msk_connection branch October 10, 2023 18:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants