-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Long-lived transactions #1156
base: series/2.x
Are you sure you want to change the base?
Long-lived transactions #1156
Conversation
Thanks for this! It looks really thoughtful - much appreciated. I'm a bit (very, very) behind with core library maintenance so I want to catch up on that first, but I'll try to look at this properly soon. |
@bplommer No worries! We are already using this in production but it'd be cool if it was integrated in the library so we don't have to maintain a fork. Seems like the CICD fails during header checks, is that something I can fix? |
I've updated it against the base branch, that should fix this. |
Oh there are some new files that the update didn't touch - you need to run |
@bplommer Should work now. At least now it works locally, unless I'm missing something. |
Enhancement
We've had to use Kafka transactions at work and, fortunately,
fs2-kafka
has a pretty nice API throughTransactionalKafkaProducer
(thanks :D).However, in our use case, we had to hold the Kafka transaction for a long period of time since we were producing substantial amounts of records into Kafka. We also preferred if that operation is atomic.
Unfortunately, it seems to me that
TransactionalKafkaProducer
'sproduce
method begins and commits/aborts a transaction only for one chunk ofProducerRecords
. Unluckily for us, this is not enough as we'd be receiving chunks periodically (from a stream).Proposal
API
An API such as:
would suffice.
However, this can easily introduce invalid state as the consumer of this API could, for example, attempt to commit/abort a transaction that does not even exist. Also, the user could forget to commit/abort the transaction, intentionally or not.
A better approach would be to create a
Transaction
class, that is similar to aTransactionalKafkaProducer
, with the exception that it can only be obtained within aResource
context. This ensures that when the resource is released, the transaction is always handled.This is what is achieved here with:
Example:
We also of course need to ensure the resource is only acquired when the
Semaphore
has one permit. For that, I had to make an internal change, exposingWithTransactionalProducer
's internalSemaphore
:We do have
ExclusiveAccess[F, A]
however it does not suffice as it only grants the permit for one operation (F[A] => F[A]
).Batch committing
Within
Transaction
's implementation, aRef[F, CommittableOffsetBatch[F]]
is used to store all the batches to be committed once the resource is released. Batches are merged together in every call toproduce
.Transaction leaks
Leaks are possible. This is an example from a test that causes them:
Within
Transaction
's implementation, aRef[F, Boolean]
is used to indicate whether or not a transaction has been closed. If there's a call toproduce
when the transaction is over, it throwsTransactionLeakedException
.Would be nice if you have any inputs to make this (if possible) not happen.
All in all, this approach is pretty much identical to what
zio-kafka
currently does with their transaction implementation. But I'd like to hear your opinion on how we can make this better.