-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add retry to Kafka Consumer Create in source #3399
Add retry to Kafka Consumer Create in source #3399
Conversation
...kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java
Outdated
Show resolved
Hide resolved
...kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java
Outdated
Show resolved
Hide resolved
...kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java
Outdated
Show resolved
Hide resolved
3d3977d
to
5e316e9
Compare
@@ -82,6 +83,8 @@ | |||
@SuppressWarnings("deprecation") | |||
@DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class) | |||
public class KafkaSource implements Source<Record<Event>> { | |||
public static final String NO_RESOLVABLE_URLS_ERROR_MESSAGE = "No resolvable bootstrap urls given in bootstrap.servers"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed 👍 intellij auto generate got me
kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); | ||
} catch (ConfigException ce) { | ||
if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) { | ||
LOG.warn(ce.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to provide context with this error mesage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed 👍
} catch (ConfigException ce) { | ||
if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) { | ||
LOG.warn(ce.getMessage()); | ||
LOG.warn("Bootstrap URL could not be resolved. Retrying in " + RETRY_SLEEP_INTERVAL + "ms..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It is better to inject variables to logs with LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed
5e316e9
to
6abdc44
Compare
Signed-off-by: Jonah Calvo <[email protected]>
6abdc44
to
e2aaf35
Compare
Signed-off-by: Jonah Calvo <[email protected]> (cherry picked from commit 49bf461)
Signed-off-by: Jonah Calvo <[email protected]> (cherry picked from commit 49bf461) Co-authored-by: Jonah Calvo <[email protected]>
Description
Certain error was seen when Kafka Source was constructed using new MSK instance:
Adding 3 retries of 30 seconds each to the Kafka consumer construction to account for Route53 propagation delays.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.