Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid late records preemptively rotating/committing S3 output files #574

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

frankgrimes97
Copy link

When late data is arriving on a Kafka partition (e.g. data for the previous hourly encodedPartition) the following check triggers an immediate rotation and commit of files:

When late data is interleaved with up-to-date data arriving the problem is exacerbated.
When this happens, a quick succession of rotations cause a large number of small files to be committed to S3.
This affects both the performance/throughput of Kafka Connect as well as downstream consumers which need to deal with the many small file fragments.

This PR adds a new max.open.files.per.partition S3SinkConnectorConfig. It defaults to 1, which preserves the current existing behavior.

If set to a value > 1, the following behavior is enabled:

  • A separate commit file is kept open for each encodedPartition target up to a maximum of max.open.files.per.partition

  • Only when any of the encodedPartition targets hits its rotation condition (flush.size, rotate.interval.ms) does rotation occur, committing all open files. All files are committed so that S3Sink's pre-commit hook will commit a high watermark of offset to the Kafka consumer group. This avoids buffered gaps of data still being in-flight when that occurs.

It's worth noting that this issue/limitation was previously encountered and is well-described as part of:
"CC-2313 Handle late arriving records in storage cloud sink connectors" #187

However, that feature was subsequently reverted:
a2ce6fc confluentinc/kafka-connect-storage-common#87

N.B. Unlike the solution proposed on CC-2313, we do not opt to write late data to an incorrect encodedPartition. i.e. late data for hour 7 will not land in a path/file for hour 8

@frankgrimes97 frankgrimes97 requested a review from a team as a code owner October 21, 2022 15:08
When late data is arriving on a Kafka partition (e.g. data for the previous
hourly encodedPartition) the following check triggers an immediate rotation
and commit of files:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/918730d011dcd199e810ec3a68a03ab01c927f62/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L410

When late data is interleaved with up-to-date data arriving the problem
is exacerbated.
When this happens, a quick succession of rotations cause a large number of
small files to be committed to S3.
This affects both the performance/throughput of Kafka Connect as well
as downstream consumers which need to deal with the many small file fragments.

This PR adds a new `max.open.files.per.partition` S3SinkConnectorConfig.
It defaults to 1, which preserves the current existing behavior.

If set to a value > 1, the following behavior is enabled:

- A separate commit file is kept open for each encodedPartition target
   up to a maximum of `max.open.files.per.partition`

- Only when any of the encodedPartition targets hits its rotation condition
   (`flush.size`, `rotate.interval.ms`) does rotation occur, committing all
   open files. All files are committed so that S3Sink's pre-commit hook will
   commit a high watermark of offset to the Kafka consumer group. This avoids
   buffered gaps of data still being in-flight when that occurs.

It's worth noting that this issue/limitation was previously encountered
and is well-described as part of:
"CC-2313 Handle late arriving records in storage cloud sink connectors"
confluentinc#187

However, that feature was subsequently reverted:
confluentinc@a2ce6fc
confluentinc/kafka-connect-storage-common#87

N.B. Unlike the solution proposed on CC-2313, we do not opt to write
late data to an incorrect encodedPartition. i.e. late data for hour 7 will
not land in a path/file for hour 8
@frankgrimes97 frankgrimes97 force-pushed the feature/avoid-many-small-s3-files-on-late-data branch from 9351a51 to 0af783e Compare October 21, 2022 15:22
@frankgrimes97 frankgrimes97 changed the title Avoid late records preemptively rotating/commiting S3 output files Avoid late records preemptively rotating/committing S3 output files Oct 21, 2022
@frankgrimes97
Copy link
Author

Hi, it's not clear to me that the Jenkins-public-CI integration test failures are due to my code changes.
"AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;"
Is that CI check broken? (I see most other outside-contributed PRs have similar failed checks)

The default baseRecordTimestamp was incorrect leading to
unnessary file committing and rotation.
Update unit test case accordingly.
@frankgrimes97
Copy link
Author

Quick follow-up... we've been stably and successfully running this feature branch for a number of weeks now and it has improved both throughput and reduced the proliferation of unnecessarily small files when late data is being processed by the S3 Sink Connector.

We'd very much prefer to have this ultimately merged upstream to avoid needing to maintain our own fork moving forward.
Is there any interest on Confluent's side to work with us to get this merged?

Cheers!

@frankgrimes97
Copy link
Author

@kkonstantine I see you worked on and approved #187. Any chance you could help us get this PR reviewed and hopefully merged upstream? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant