-
Notifications
You must be signed in to change notification settings - Fork 205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for kafka source not committing offsets issue #3231 #3232
Fix for kafka source not committing offsets issue #3231 #3232
Conversation
if (partitionEpoch == 0) { | ||
LOG.info("Skipping partition {}, lost ownership", topicPartition); | ||
if (acknowledgementsEnabled && partitionEpoch == 0) { | ||
//ToDo: Add metric |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the metric here be useful for debugging? I'm wondering if it's worth just adding now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't have time to add a new metric. I'll see if I can add now, but there are other metrics and logs from which it can be inferred if we are hitting this condition. Ideally, it should not happen and an assert is more appropriate here.
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions); | ||
for (TopicPartition topicPartition : partitions) { | ||
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition); | ||
LOG.info("Assigned partition {} with offsets: beginningOffset: {}, endOffset: {}, committedOffset: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: this might be better off as a debug log rather than info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very crucial log for debugging, I wish we had it from the beginning. If debug shows up by default then I can change it to debug level else leave it as INFO.
stopConsumer(topicExecutorService.get(topic), topicConsumer.get(topic)); | ||
}); | ||
LOG.info("Consumer shutdown successfully..."); | ||
final long shutdownWaitTime = calculateLongestThreadWaitingTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the upper limit on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at calculateLongestThreadWaitingTime() implementation it looks like it can be atmost 5 seconds (default value) unless user has configured to be higher number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @hshardeesi !
There is a lot here that could be unit tested to ensure that the behavior remains correct.
@@ -68,6 +90,7 @@ public OffsetAndMetadata addCompletedOffsets(final Range<Long> offsetRange) { | |||
Long maxValue = offsetMinMap.get(committedOffset).getMaximum(); | |||
if (maxValue != committedOffset) { | |||
offsetMinMap.remove(committedOffset); | |||
committedRecordCount += (maxValue - committedOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are fairly critical calculations. Can you add unit tests for these with some of the difference scenarios? e.g. start at 0, start at non-zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I'll add unit test for this and other scenarios you pointed out below.
LOG.info("Assigned partition {} with offsets: beginningOffset: {}, endOffset: {}, committedOffset: {}", | ||
topicPartition, getTopicPartitionOffset(beginningOffsets, topicPartition), | ||
getTopicPartitionOffset(endOffsets, topicPartition), | ||
Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line isn't assigning anything. I think you can remove it.
@@ -238,6 +232,21 @@ void processAcknowledgedOffsets() { | |||
acknowledgedOffsets.clear(); | |||
} | |||
|
|||
private void updateCommitCount(final TopicPartition topicPartition, final OffsetAndMetadata offsetAndMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears this only updates the metric, but the name implies it updates an actual count. Perhaps rename to updateCommitCountMetric
.
partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange.getOffsets()); | ||
updateOffsetsToCommit(partition, offsetAndMetadata); | ||
} else { | ||
LOG.error("!! Commit tracker not found for topic: {} partition: {}", partition.topic(), partitionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need these exclamation marks. The log level is ERROR
which is sufficient here.
updateOffsetsToCommit(partition, | ||
new OffsetAndMetadata(offsetRange.getOffsets().getMaximum() + 1), | ||
offsetRange.getOffsets())); | ||
offsets.forEach((partition, offsetRange) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be unit testing this behavior.
|
…3231 Signed-off-by: Hardeep Singh <[email protected]>
79072b5
to
6129499
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the tests!
Signed-off-by: Hardeep Singh <[email protected]> (cherry picked from commit 89e8f39)
Signed-off-by: Hardeep Singh <[email protected]> (cherry picked from commit 89e8f39) Co-authored-by: Hardeep Singh <[email protected]>
Description
This PR fixes following issues:
Following tests were conducted with these changes:
Issues Resolved
Resolves #3231
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.