-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transactional committing and producing with rebalanceSafeCommits #1425
base: master
Are you sure you want to change the base?
Conversation
Just a POC for now, this was the easiest interface change I could think of
This is nice. Some questions:
|
The updated However, this is a strange set-up. The I propose that we change the construction of
Re. method |
What is |
Agreed, providing the If we want to check the settings, we could have a There is no |
Yeah, that is a good point. We'd better make very good release notes then :)
Not necessarily, but I guess that is what is needed indeed.
Check, I get it now. Well done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Some nitpicks in the comments.
BTW, enforcing rebalanceSafeCommits
in the consumer would still be nice.
override def registerOffsetsCommittedInTransaction( | ||
offsetBatch: OffsetBatch | ||
): Task[Unit] = | ||
runloopAccess.withRunloopZIO(true)(runloop => runloop.registerOffsetsCommittedInTransaction(offsetBatch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider adding method registerOffsetsCommittedInTransaction
to runloopAccess
. This would be more in line with how ConsumerLive uses RunloopAccess in other places.
/** | ||
* Used internally by the [[zio.kafka.producer.TransactionalProducer]] | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** | |
* Used internally by the [[zio.kafka.producer.TransactionalProducer]] | |
*/ | |
/** | |
* This is not a public API. | |
* It is used internally by the [[zio.kafka.producer.TransactionalProducer]]. | |
*/ |
TopicPartition, | ||
OffsetAndMetadata | ||
] => Task[Unit] = offsets => | ||
committedOffsetsRef.modify(_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))).unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put this below val commit
to keep the order of the trait.
committedOffsetsRef.modify(_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))).unit | |
committedOffsetsRef | |
.modify { | |
// The continuation promise can be `null` because this commit is not handled by the consumer, | |
// and transactional commits have no completion handler anyway. | |
_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null))) | |
} | |
.unit |
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" | ||
), | ||
rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef) | ||
ZIO.scoped { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this extra scoped
fix the spurious test failures?
This simplifies the setup for transactional producing and committing, no longer requiring a custom rebalance listener or the use of
partitionedAssignmentStream
, seeConsumerSpec
.Note that the transaction ID is no longer used for fencing since https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics, the mechanism relies on the consumer group metadata instead.
NOTE This change is not binary compatible