From fe0ac8a181b041804660676041a164ae8b9a393d Mon Sep 17 00:00:00 2001 From: Gianluca Finocchiaro Date: Wed, 6 Nov 2024 16:00:44 +0100 Subject: [PATCH] Fix unhandled runtime exception in synchronous commit (#27) --- .../adapters/consumers/ConsumerLoop.java | 22 ++++++-- .../adapters/config/ConnectorConfigTest.java | 54 ++++++++++--------- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/consumers/ConsumerLoop.java b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/consumers/ConsumerLoop.java index a78c4373..d7d5e350 100644 --- a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/consumers/ConsumerLoop.java +++ b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/consumers/ConsumerLoop.java @@ -181,7 +181,7 @@ public void run() { log.atDebug().log("Kafka Consumer woken up"); } finally { log.atDebug().log("Start closing Kafka Consumer"); - offsetManager.commitSync(); + offsetManager.commitSyncAndIgnoreErrors(); consumer.close(); latch.countDown(); log.atDebug().log("Kafka Consumer closed"); @@ -330,8 +330,24 @@ public void onPartitionsRevoked(Collection partitions) { } void commitSync() { - consumer.commitSync(currentOffsets); - log.atInfo().log("Offsets commited"); + commitSync(false); + } + + void commitSyncAndIgnoreErrors() { + commitSync(true); + } + + private void commitSync(boolean ignoreErrors) { + try { + log.atDebug().log("Start commiting offset synchronously"); + consumer.commitSync(currentOffsets); + log.atInfo().log("Offsets commited"); + } catch (KafkaException e) { + log.atError().setCause(e).log("Unable to commit offsets"); + if (!ignoreErrors) { + throw e; + } + } } void commitAsync() { diff --git a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java index 78d9a4ba..ac9c52a7 100644 --- a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java +++ b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java @@ -20,31 +20,10 @@ import static com.google.common.truth.Truth.assertThat; import static com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.SslProtocol.TLSv12; import static com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.SslProtocol.TLSv13; + import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Stream; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.junit.function.ThrowingRunnable; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - import com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.EvaluatorType; import com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.RecordComsumeFrom; import com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.RecordErrorHandlingStrategy; @@ -61,6 +40,28 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.junit.function.ThrowingRunnable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; + public class ConnectorConfigTest { private Path adapterDir; @@ -311,8 +312,10 @@ private Map standardParameters() { standardParams.put(ConnectorConfig.CONSUMER_SESSION_TIMEOUT_MS, "800"); standardParams.put(ConnectorConfig.CONSUMER_MAX_POLL_INTERVAL_MS, "2000"); // Unmodifiable standardParams.put(ConnectorConfig.CONSUMER_METADATA_MAX_AGE_CONFIG, "250"); // Unmodifiable - standardParams.put(ConnectorConfig.CONSUMER_DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"); // Unmodifiable - standardParams.put(ConnectorConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG, "15000"); // Unmodifiable + standardParams.put( + ConnectorConfig.CONSUMER_DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"); // Unmodifiable + standardParams.put( + ConnectorConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG, "15000"); // Unmodifiable standardParams.put("item-template.template1", "template1-#{v=VALUE}"); standardParams.put("item-template.template2", "template2-#{v=OFFSET}"); standardParams.put("map.topic1.to", "template1"); @@ -504,8 +507,7 @@ public void shouldRetrieveBaseConsumerProperties() { ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "60000", ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, - "30000" - ); + "30000"); assertThat(baseConsumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)) .startsWith("KAFKA-CONNECTOR-"); }