Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional committing and producing with rebalanceSafeCommits #1425

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

svroonland
Copy link
Collaborator

@svroonland svroonland commented Dec 29, 2024

This simplifies the setup for transactional producing and committing, no longer requiring a custom rebalance listener or the use of partitionedAssignmentStream, see ConsumerSpec.

Note that the transaction ID is no longer used for fencing since https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics, the mechanism relies on the consumer group metadata instead.

NOTE This change is not binary compatible

@svroonland svroonland marked this pull request as ready for review January 2, 2025 10:03
@erikvanoosten
Copy link
Collaborator

This is nice. Some questions:

  • Can we somehow guarantee that rebalanceSafeCommits is enabled when we use this new transactional producer?
  • If we simply roll this out, will applications with the existing transactional producers break? Maybe not, but if that is so, I propose we either make this backward compatible, or we move this to zio-kafka 3.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jan 4, 2025

The updated TransactionProducer is not binary backward compatible; it requires a Consumer in the environment, so the ZIO type is different. It will be source compatible for many applications because many will have a Consumer available anyway.

However, this is a strange set-up. The Consumer and the TransactionProducer are tied together; they must connect to the same Kafka broker cluster. Therefore it makes more sense to produce them together, or at least let one refer to the other.

I propose that we change the construction of TransactionProducers such that they require a Consumer.
This has some advantages:

  • Method TransactionProducer.createTransaction stays backward compatible because its type doesn't change.
  • We could even make it fully backward compatible: if a consumer is provided we do the new thing pioneered in this PR, if its not provided, nothing changes. When a Consumer is provided could also force rebalanceSafeCommits to true.
  • We may even be able to check that the provided Consumer connects to the same broker as the TransactionProducer that is being constructed.

Re. method Consumer.registerOffsetsCommittedInTransaction, this is an internal method and should not be exposed too easily. We could do this by creating a separate trait (e.g. something like ConsumerInternal) and let ConsumerLive extend both traits. This is not the best way, we can probably think of something better.

@erikvanoosten
Copy link
Collaborator

What is Consumer.markCommittedInTransaction actually used for? I don't see where it is used.

@svroonland
Copy link
Collaborator Author

Agreed, providing the Consumer upon TransactionalProducer creation is better. I think it's fine to disregard backwards compatibility here, since the old method was very complicated and not well supported, this is such an improvement in usability.

If we want to check the settings, we could have a Consumer.settings method to retrieve the settings with which the Consumer was created. Is that what you have in mind as well?

There is no Consumer.markCommittedInTransaction, only Committer.markCommittedInTransaction. I'm not sure how we can hide this method, but agree that this is not ideal. In an earlier commit this was done with an internal method on the OffsetBatch, I guess we could revert to that.

@svroonland svroonland added this to the 3.0.0 milestone Jan 5, 2025
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jan 5, 2025

I think it's fine to disregard backwards compatibility here...

Yeah, that is a good point. We'd better make very good release notes then :)

we could have a Consumer.settings method ... Is that what you have in mind as well?

Not necessarily, but I guess that is what is needed indeed.

There is no Consumer.markCommittedInTransaction, only Committer.markCommittedInTransaction.

Check, I get it now. Well done.

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Some nitpicks in the comments.

BTW, enforcing rebalanceSafeCommits in the consumer would still be nice.

override def registerOffsetsCommittedInTransaction(
offsetBatch: OffsetBatch
): Task[Unit] =
runloopAccess.withRunloopZIO(true)(runloop => runloop.registerOffsetsCommittedInTransaction(offsetBatch))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider adding method registerOffsetsCommittedInTransaction to runloopAccess. This would be more in line with how ConsumerLive uses RunloopAccess in other places.

Comment on lines +162 to +164
/**
* Used internally by the [[zio.kafka.producer.TransactionalProducer]]
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Used internally by the [[zio.kafka.producer.TransactionalProducer]]
*/
/**
* This is not a public API.
* It is used internally by the [[zio.kafka.producer.TransactionalProducer]].
*/

TopicPartition,
OffsetAndMetadata
] => Task[Unit] = offsets =>
committedOffsetsRef.modify(_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))).unit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put this below val commit to keep the order of the trait.

Suggested change
committedOffsetsRef.modify(_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))).unit
committedOffsetsRef
.modify {
// The continuation promise can be `null` because this commit is not handled by the consumer,
// and transactional commits have no completion handler anyway.
_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))
}
.unit

ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200"
),
rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef)
ZIO.scoped {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this extra scoped fix the spurious test failures?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants