Skip to content

Commit

Permalink
Fix exceptions management for single-threaded processing (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfinocchiaro authored Dec 16, 2024
1 parent 9b60c71 commit f534e4f
Show file tree
Hide file tree
Showing 3 changed files with 620 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,20 @@ void consumeRecord(ConsumerRecord<K, V> record) {

switch (errorStrategy) {
case IGNORE_AND_CONTINUE -> {
logger.atWarn().log("Ignoring error");
// Log the error here to catch the stack trace
logger.atWarn().setCause(ve).log("Ignoring error");
offsetService.updateOffsets(record);
}

case FORCE_UNSUBSCRIPTION -> {
// Do not log the error here because it will fully logged from the consuming loop
logger.atWarn().log("Forcing unsubscription");
throw new KafkaException(ve);
}
}
} catch (Throwable t) {
logger.atError().log("Serious error while processing record!");
throw new KafkaException(t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ static Stream<Arguments> handleErrors() {
return Stream.of(
Arguments.of(1, ValueException.fieldNotFound("field")),
Arguments.of(2, ValueException.fieldNotFound("field")),
Arguments.of(1, new RuntimeException("Serious issue")),
Arguments.of(2, new RuntimeException("Serious issue")));
}

Expand All @@ -666,7 +667,7 @@ public void shouldHandleErrors(int numOfThreads, RuntimeException exception) {
ConsumerRecords<String, String> records = generateRecords("topic", 30, keys, partitions);

// Prepare the list of offsets that will trigger a ValueException upon processing
List<ConsumedRecordInfo> offsetsTriggeringException =
List<ConsumedRecordInfo> offendingOffsets =
List.of(
new ConsumedRecordInfo("topic", 0, 2l),
new ConsumedRecordInfo("topic", 0, 4l),
Expand All @@ -676,21 +677,26 @@ public void shouldHandleErrors(int numOfThreads, RuntimeException exception) {
RecordConsumer<String, String> recordConsumer =
RecordConsumer.<String, String>recordProcessor(
new MockRecordProcessor<String, String>(
exception, offsetsTriggeringException))
exception, offendingOffsets))
.offsetService(offsetService)
.errorStrategy(RecordErrorHandlingStrategy.FORCE_UNSUBSCRIPTION)
.logger(logger)
.threads(numOfThreads)
// This enforse usage of the SingleThreadedConsume if numOfThreads is 1
// .preferSingleThread(true)
// This enforces usage of the SingleThreadedConsumer if numOfThreads is 1
.preferSingleThread(true)
.build();

assertThrows(KafkaException.class, () -> recordConsumer.consumeRecords(records));

// Ensure that the commited offsets do not include the broken ones.
List<ConsumedRecordInfo> consumedRecords = offsetService.getConsumedRecords();
assertThat(consumedRecords).hasSize(records.count() - offsetsTriggeringException.size());
assertThat(consumedRecords).containsNoneIn(offsetsTriggeringException);
// For single-threaded processing, processing will stop upon first failure, therefore only
// the first two records (offsets 0l and 1l) will be procesed.
// For concurrent processing, processing won't stop upon first faulire, therefore we expect
// to find only the "good" offsets.
int expectedNumOfProcessedRecords =
numOfThreads == 1 ? 2 : records.count() - offendingOffsets.size();
assertThat(consumedRecords).hasSize(expectedNumOfProcessedRecords);
assertThat(consumedRecords).containsNoneIn(offendingOffsets);
}

@ParameterizedTest
Expand All @@ -703,7 +709,7 @@ public void shouldIgnoreErrorsOnlyIfValueException(
ConsumerRecords<String, String> records = generateRecords("topic", 30, keys, partitions);

// Prepare the list of offsets that will trigger a ValueException upon processing
List<ConsumedRecordInfo> offsetsTriggeringException =
List<ConsumedRecordInfo> offendingOffsets =
List.of(
new ConsumedRecordInfo("topic", 0, 2l),
new ConsumedRecordInfo("topic", 0, 4l),
Expand All @@ -713,27 +719,28 @@ public void shouldIgnoreErrorsOnlyIfValueException(
RecordConsumer<String, String> recordConsumer =
RecordConsumer.<String, String>recordProcessor(
new MockRecordProcessor<String, String>(
exception, offsetsTriggeringException))
exception, offendingOffsets))
.offsetService(offsetService)
// The following prevents the exception to be propagated
.errorStrategy(RecordErrorHandlingStrategy.IGNORE_AND_CONTINUE)
.logger(logger)
.threads(numOfThreads)
// This enforse usage of the SingleThreadedConsume if numOfThreads is 1
// This enforces usage of the SingleThreadedConsume if numOfThreads is 1
.preferSingleThread(true)
.build();

if (exception instanceof ValueException) {
recordConsumer.consumeRecords(records);
// Ensure that all offsets are commited
// Ensure that all offsets are commited (even the offending ones)
List<ConsumedRecordInfo> consumedRecords = offsetService.getConsumedRecords();
assertThat(consumedRecords).hasSize(records.count());
} else {
assertThrows(KafkaException.class, () -> recordConsumer.consumeRecords(records));
List<ConsumedRecordInfo> consumedRecords = offsetService.getConsumedRecords();
assertThat(consumedRecords)
.hasSize(records.count() - offsetsTriggeringException.size());
assertThat(consumedRecords).containsNoneIn(offsetsTriggeringException);
int expectedNumOfProcessedRecords =
numOfThreads == 1 ? 2 : records.count() - offendingOffsets.size();
assertThat(consumedRecords).hasSize(expectedNumOfProcessedRecords);
assertThat(consumedRecords).containsNoneIn(offendingOffsets);
}
}

Expand Down
Loading

0 comments on commit f534e4f

Please sign in to comment.