-
Notifications
You must be signed in to change notification settings - Fork 332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid late records preemptively rotating/committing S3 output files #574
base: master
Are you sure you want to change the base?
Avoid late records preemptively rotating/committing S3 output files #574
Conversation
When late data is arriving on a Kafka partition (e.g. data for the previous hourly encodedPartition) the following check triggers an immediate rotation and commit of files: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/918730d011dcd199e810ec3a68a03ab01c927f62/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L410 When late data is interleaved with up-to-date data arriving the problem is exacerbated. When this happens, a quick succession of rotations cause a large number of small files to be committed to S3. This affects both the performance/throughput of Kafka Connect as well as downstream consumers which need to deal with the many small file fragments. This PR adds a new `max.open.files.per.partition` S3SinkConnectorConfig. It defaults to 1, which preserves the current existing behavior. If set to a value > 1, the following behavior is enabled: - A separate commit file is kept open for each encodedPartition target up to a maximum of `max.open.files.per.partition` - Only when any of the encodedPartition targets hits its rotation condition (`flush.size`, `rotate.interval.ms`) does rotation occur, committing all open files. All files are committed so that S3Sink's pre-commit hook will commit a high watermark of offset to the Kafka consumer group. This avoids buffered gaps of data still being in-flight when that occurs. It's worth noting that this issue/limitation was previously encountered and is well-described as part of: "CC-2313 Handle late arriving records in storage cloud sink connectors" confluentinc#187 However, that feature was subsequently reverted: confluentinc@a2ce6fc confluentinc/kafka-connect-storage-common#87 N.B. Unlike the solution proposed on CC-2313, we do not opt to write late data to an incorrect encodedPartition. i.e. late data for hour 7 will not land in a path/file for hour 8
9351a51
to
0af783e
Compare
Hi, it's not clear to me that the Jenkins-public-CI integration test failures are due to my code changes. |
The default baseRecordTimestamp was incorrect leading to unnessary file committing and rotation. Update unit test case accordingly.
Quick follow-up... we've been stably and successfully running this feature branch for a number of weeks now and it has improved both throughput and reduced the proliferation of unnecessarily small files when late data is being processed by the S3 Sink Connector. We'd very much prefer to have this ultimately merged upstream to avoid needing to maintain our own fork moving forward. Cheers! |
@kkonstantine I see you worked on and approved #187. Any chance you could help us get this PR reviewed and hopefully merged upstream? Thanks! |
When late data is arriving on a Kafka partition (e.g. data for the previous hourly encodedPartition) the following check triggers an immediate rotation and commit of files:
kafka-connect-storage-cloud/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java
Line 410 in 918730d
When late data is interleaved with up-to-date data arriving the problem is exacerbated.
When this happens, a quick succession of rotations cause a large number of small files to be committed to S3.
This affects both the performance/throughput of Kafka Connect as well as downstream consumers which need to deal with the many small file fragments.
This PR adds a new
max.open.files.per.partition
S3SinkConnectorConfig. It defaults to 1, which preserves the current existing behavior.If set to a value > 1, the following behavior is enabled:
A separate commit file is kept open for each encodedPartition target up to a maximum of
max.open.files.per.partition
Only when any of the encodedPartition targets hits its rotation condition (
flush.size
,rotate.interval.ms
) does rotation occur, committing all open files. All files are committed so that S3Sink's pre-commit hook will commit a high watermark of offset to the Kafka consumer group. This avoids buffered gaps of data still being in-flight when that occurs.It's worth noting that this issue/limitation was previously encountered and is well-described as part of:
"CC-2313 Handle late arriving records in storage cloud sink connectors" #187
However, that feature was subsequently reverted:
a2ce6fc confluentinc/kafka-connect-storage-common#87
N.B. Unlike the solution proposed on CC-2313, we do not opt to write late data to an incorrect encodedPartition. i.e. late data for hour 7 will not land in a path/file for hour 8