Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for kafka source not committing offsets issue #3231 #3232

Merged
merged 1 commit into from
Aug 24, 2023

Conversation

hshardeesi
Copy link
Contributor

Description

This PR fixes following issues:

  • Commit offsets when a kafka partition's beginning offset is non-zero.
  • Fix numRecordsCommitted metric.
  • Fix shutdown sequence for all topics+workers.

Following tests were conducted with these changes:

Ingest 10M records from a kafka topic + group membership mutations.
Ingest 12 M records from 2 kafka topics in single pipeline.
Ingest 10M records from 2 different topics in 2 different pipelines.
2 pipelines part of same consumer group ingesting 10 M records.
Shutdown pipeline in the middle of ingestion and resume and make sure all data is ingested.
Ingest data without acknowledgments enabled.
Ingest data with auto_offset_reset=earliest/latest.

Issues Resolved

Resolves #3231

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

if (partitionEpoch == 0) {
LOG.info("Skipping partition {}, lost ownership", topicPartition);
if (acknowledgementsEnabled && partitionEpoch == 0) {
//ToDo: Add metric
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the metric here be useful for debugging? I'm wondering if it's worth just adding now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't have time to add a new metric. I'll see if I can add now, but there are other metrics and logs from which it can be inferred if we are hitting this condition. Ideally, it should not happen and an assert is more appropriate here.

final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
for (TopicPartition topicPartition : partitions) {
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition);
LOG.info("Assigned partition {} with offsets: beginningOffset: {}, endOffset: {}, committedOffset: {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this might be better off as a debug log rather than info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very crucial log for debugging, I wish we had it from the beginning. If debug shows up by default then I can change it to debug level else leave it as INFO.

stopConsumer(topicExecutorService.get(topic), topicConsumer.get(topic));
});
LOG.info("Consumer shutdown successfully...");
final long shutdownWaitTime = calculateLongestThreadWaitingTime();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the upper limit on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at calculateLongestThreadWaitingTime() implementation it looks like it can be atmost 5 seconds (default value) unless user has configured to be higher number.

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @hshardeesi !

There is a lot here that could be unit tested to ensure that the behavior remains correct.

@@ -68,6 +90,7 @@ public OffsetAndMetadata addCompletedOffsets(final Range<Long> offsetRange) {
Long maxValue = offsetMinMap.get(committedOffset).getMaximum();
if (maxValue != committedOffset) {
offsetMinMap.remove(committedOffset);
committedRecordCount += (maxValue - committedOffset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are fairly critical calculations. Can you add unit tests for these with some of the difference scenarios? e.g. start at 0, start at non-zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I'll add unit test for this and other scenarios you pointed out below.

LOG.info("Assigned partition {} with offsets: beginningOffset: {}, endOffset: {}, committedOffset: {}",
topicPartition, getTopicPartitionOffset(beginningOffsets, topicPartition),
getTopicPartitionOffset(endOffsets, topicPartition),
Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line isn't assigning anything. I think you can remove it.

@@ -238,6 +232,21 @@ void processAcknowledgedOffsets() {
acknowledgedOffsets.clear();
}

private void updateCommitCount(final TopicPartition topicPartition, final OffsetAndMetadata offsetAndMetadata) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears this only updates the metric, but the name implies it updates an actual count. Perhaps rename to updateCommitCountMetric.

partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange.getOffsets());
updateOffsetsToCommit(partition, offsetAndMetadata);
} else {
LOG.error("!! Commit tracker not found for topic: {} partition: {}", partition.topic(), partitionId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need these exclamation marks. The log level is ERROR which is sufficient here.

updateOffsetsToCommit(partition,
new OffsetAndMetadata(offsetRange.getOffsets().getMaximum() + 1),
offsetRange.getOffsets()));
offsets.forEach((partition, offsetRange) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be unit testing this behavior.

@hshardeesi
Copy link
Contributor Author

Thanks for the contribution @hshardeesi !

There is a lot here that could be unit tested to ensure that the behavior remains correct.

@hshardeesi hshardeesi closed this Aug 24, 2023
@hshardeesi hshardeesi reopened this Aug 24, 2023
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the tests!

@dlvenable dlvenable merged commit 89e8f39 into opensearch-project:main Aug 24, 2023
25 of 26 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Aug 24, 2023
Signed-off-by: Hardeep Singh <[email protected]>
(cherry picked from commit 89e8f39)
dlvenable pushed a commit that referenced this pull request Aug 24, 2023
Signed-off-by: Hardeep Singh <[email protected]>
(cherry picked from commit 89e8f39)

Co-authored-by: Hardeep Singh <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Kafka source plugin does not commit offsets when "acknowledgments" are enabled
3 participants