Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose leaseCounter to users #904

Open
mschuwalow opened this issue Jul 17, 2023 · 6 comments · Fixed by #917
Open

Expose leaseCounter to users #904

mschuwalow opened this issue Jul 17, 2023 · 6 comments · Fixed by #917

Comments

@mschuwalow
Copy link
Contributor

The leaseCounter is a monotonically increasing counter.
Together with the shardId (which is also a monotonically increasing counter), the leaseCounter is very well suited as a fencingToken to fence writes from zombie consumers for a given partition key.

To make this use-case simpler we should consider either giving users the leaseCounter when the shard was first assigned to the current worker or a ready made fencing token which is the concatenation $shardId-$leaseCounter.

Both options could easily be passed as 4th element in the tuples returned from Consumer.shardedStream, though that would be a backwards-compat breaking change.

@mschuwalow mschuwalow changed the title Expose leasecounter to users Expose leaseCounter to users Jul 17, 2023
@svroonland
Copy link
Owner

For my understanding, could you elaborate on what would constitute a zombie consumer in zio-kinesis? Is it a consumer that would keep a lease even though it has expired? What events would lead to such a zombie?

Would adding the leaseCounter to the DiagnosticEvent.LeaseAcquired be a good alternative for your use case? Or does the checkpoint field in that case class suffice?

@mschuwalow
Copy link
Contributor Author

What I meant here is that a consumer will only notice that it lost the lease for a shard when doing a refreshLeases or trying to commit a checkpoint. So it's not really about a consumer that keeps a lease after it has expired, rather it keeps it until it notices that it has expired.
During that time it will run concurrently with the new consumer that now owns the shard and we should fence writes to datastores.

Adding it to the diagnostic events would work, though the ergonomics would be bad. The approach that I had in mind was to use the leasecounter immediately when you start consuming the shard:

Consumer
   .shardedStream(???)
   .flatMapPar(Int.MaxValue) { case (shardId, shardStream, checkpointer, fencingToken) =>
     // use fencingtoken (based on leasecounter) to fence any writes by consumers for the same partitionkey
     ???
   }

And digging the correct leasecounter out from the diagnosticevents there is going to be a bit clumsy.

@svroonland
Copy link
Owner

What about being able to retrieve the current sequence number from the checkpointer and combining it with the current timestamp? Would that work as a fencing token?

@mschuwalow
Copy link
Contributor Author

mschuwalow commented Aug 10, 2023

I would not use the timestamp, but having access to the checkpointer would work perfectly.
Currently I use the leaserepository, but that has obvious shortcomings:

  private final val shardIdRegex = "shardId-(\\d{12})".r

  private def extractShardId(str: String) = str match {
    case shardIdRegex(id) => ZIO.succeed(id.toLong)
    case _                => ZIO.fail(FailedToExtractShardId(str))
  }
  
  private def getLeaseCounter(shardId: String, tableName: String, workerIdentifier: String) =
    ZStream
      .serviceWithStream[LeaseRepository](_.getLeases(tableName))
      .filter(lease => lease.key == shardId && lease.owner.contains(workerIdentifier))
      .runHead
      .someOrFailException
      .retry(Schedule.spaced(1.second) && Schedule.recurs(30))
      .map(_.counter)

  private def makeFencingToken(shardIdStr: String, tableName: String, workerIdentifier: String) =
    for {
      shardId      <- extractShardId(shardIdStr)
      leaseCounter <- getLeaseCounter(shardIdStr, tableName, workerIdentifier)
      token         = FencingToken(shardId, leaseCounter)
      _            <- ZIO.logInfo(s"Made fencing token for shard $shardIdStr: $token")
    } yield token

@mschuwalow
Copy link
Contributor Author

mschuwalow commented Feb 28, 2024

@svroonland I only ended up upgrading / using the new functionality now and realized that the linked pr is not actually what I need 🤦

What I want for fencing (e.g. ensuring that zombies cannot overwrite data in a db) is some value that is monotonically increasing whenever a new consumer is assigned to a particular shard -> the lease counter.
The last checkpoint does not work here as it's not monotonically increasing, which would allow multiple consumers to exist with the same value in principle.

If you could also expose that from the checkpointer I would be a happy camper 🙏 .

@svroonland
Copy link
Owner

No problem, let's reopen the issue then. The Checkpointer currently does not have the lease counter in scope.

@svroonland svroonland reopened this Mar 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants