diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java index d6fc5e6e..7f8d6166 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java @@ -49,6 +49,9 @@ public abstract class Channel { private static final Logger LOG = LoggerFactory.getLogger(Channel.class); + // 30 seconds set as minimum for Amazon MSK to connect successfully + protected static final int INITIAL_CONNECTION_TIMEOUT_MS = 30000; + private final String controlTopic; private final String groupId; private final Producer producer; diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java index 10730d99..1a060288 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java @@ -100,7 +100,7 @@ private CommitterImpl( consumeAvailable( // initial poll with longer duration so the consumer will initialize... - Duration.ofMillis(1000), + Duration.ofMillis(INITIAL_CONNECTION_TIMEOUT_MS), envelope -> receive( envelope, diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index b7204c85..7bdba5f0 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -90,7 +90,7 @@ public Coordinator( this.commitState = new CommitState(config); // initial poll with longer duration so the consumer will initialize... - consumeAvailable(Duration.ofMillis(1000), this::receive); + consumeAvailable(Duration.ofMillis(INITIAL_CONNECTION_TIMEOUT_MS), this::receive); } public void process() {