-
-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose leaseCounter to users #904
Comments
For my understanding, could you elaborate on what would constitute a zombie consumer in zio-kinesis? Is it a consumer that would keep a lease even though it has expired? What events would lead to such a zombie? Would adding the |
What I meant here is that a consumer will only notice that it lost the lease for a shard when doing a Adding it to the diagnostic events would work, though the ergonomics would be bad. The approach that I had in mind was to use the leasecounter immediately when you start consuming the shard: Consumer
.shardedStream(???)
.flatMapPar(Int.MaxValue) { case (shardId, shardStream, checkpointer, fencingToken) =>
// use fencingtoken (based on leasecounter) to fence any writes by consumers for the same partitionkey
???
} And digging the correct leasecounter out from the diagnosticevents there is going to be a bit clumsy. |
What about being able to retrieve the current sequence number from the |
I would not use the timestamp, but having access to the checkpointer would work perfectly. private final val shardIdRegex = "shardId-(\\d{12})".r
private def extractShardId(str: String) = str match {
case shardIdRegex(id) => ZIO.succeed(id.toLong)
case _ => ZIO.fail(FailedToExtractShardId(str))
}
private def getLeaseCounter(shardId: String, tableName: String, workerIdentifier: String) =
ZStream
.serviceWithStream[LeaseRepository](_.getLeases(tableName))
.filter(lease => lease.key == shardId && lease.owner.contains(workerIdentifier))
.runHead
.someOrFailException
.retry(Schedule.spaced(1.second) && Schedule.recurs(30))
.map(_.counter)
private def makeFencingToken(shardIdStr: String, tableName: String, workerIdentifier: String) =
for {
shardId <- extractShardId(shardIdStr)
leaseCounter <- getLeaseCounter(shardIdStr, tableName, workerIdentifier)
token = FencingToken(shardId, leaseCounter)
_ <- ZIO.logInfo(s"Made fencing token for shard $shardIdStr: $token")
} yield token |
@svroonland I only ended up upgrading / using the new functionality now and realized that the linked pr is not actually what I need 🤦 What I want for fencing (e.g. ensuring that zombies cannot overwrite data in a db) is some value that is monotonically increasing whenever a new consumer is assigned to a particular shard -> the lease counter. If you could also expose that from the checkpointer I would be a happy camper 🙏 . |
No problem, let's reopen the issue then. The Checkpointer currently does not have the lease counter in scope. |
The leaseCounter is a monotonically increasing counter.
Together with the shardId (which is also a monotonically increasing counter), the leaseCounter is very well suited as a fencingToken to fence writes from zombie consumers for a given partition key.
To make this use-case simpler we should consider either giving users the leaseCounter when the shard was first assigned to the current worker or a ready made fencing token which is the concatenation $shardId-$leaseCounter.
Both options could easily be passed as 4th element in the tuples returned from Consumer.shardedStream, though that would be a backwards-compat breaking change.
The text was updated successfully, but these errors were encountered: