Skip to content

Commit

Permalink
update: use application-local locks in a request-local manner
Browse files Browse the repository at this point in the history
  • Loading branch information
kory33 committed Dec 21, 2020
1 parent ed789bb commit b61f4e1
Showing 1 changed file with 51 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,43 @@ package click.seichi.bungeesemaphore.infrastructure.redis
import akka.actor.{ActorSystem, Props}
import akka.util.ByteString
import cats.Monad
import cats.effect.{ContextShift, Effect, IO, Sync}
import cats.effect.concurrent.Deferred
import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Effect, IO, Sync}
import click.seichi.bungeesemaphore.application.configuration.Configuration
import click.seichi.bungeesemaphore.application.{EffectEnvironment, HasGlobalPlayerDataSaveLock}
import click.seichi.bungeesemaphore.domain.PlayerName
import click.seichi.generic.concurrent.synchronization.barrier.IndexedSwitchableBarrier
import redis.RedisClient

import scala.concurrent.duration.{Duration, FiniteDuration}

object LocalLockRedisBridge {
import cats.implicits._

private def lockRedis[F[_]: Effect](playerName: PlayerName, pxMillis: Option[Long])
(implicit client: RedisClient,
publishingContext: ContextShift[IO]): F[Unit] =
Effect[F].liftIO {
IO.fromFuture {
IO {
client.set(SignalFormat.lockKeyOf(playerName), 0, pxMilliseconds = pxMillis)
}
}.as(())
}

private def isAlreadyUnlockedOnRedis[F[_]: Effect](playerName: PlayerName)
(implicit client: RedisClient,
redisAccessContext: ContextShift[IO]): F[Boolean] =
Effect[F].liftIO {
IO.fromFuture {
IO {
client.get[ByteString](SignalFormat.lockKeyOf(playerName))
}
}
}.map(_.isEmpty)

def bindLocalLockToRedis[
F[_]: Effect
F[_]: ConcurrentEffect
](localLock: IndexedSwitchableBarrier[F, PlayerName])
(implicit configuration: Configuration,
actorSystem: ActorSystem,
Expand All @@ -28,7 +52,7 @@ object LocalLockRedisBridge {
}

Sync[F].delay {
val client = ConfiguredRedisClient()
implicit val client: RedisClient = ConfiguredRedisClient()

// bind the subscriber
actorSystem.actorOf(
Expand All @@ -38,33 +62,34 @@ object LocalLockRedisBridge {

// expose HasGlobalPlayerSemaphore operations to external world
new HasGlobalPlayerDataSaveLock[F] {
override def lock(playerName: PlayerName): F[Unit] = {
Effect[F].liftIO {
IO.fromFuture {
IO {
client.set(SignalFormat.lockKeyOf(playerName), 0, pxMilliseconds = pxMillis)
}
}.as(())
}
}
override def lock(playerName: PlayerName): F[Unit] = lockRedis(playerName, pxMillis)

override def awaitLockAvailability(playerName: PlayerName): F[Unit] = {
for {
// TODO begin critical section
currentlyLocked <- Effect[F].liftIO {
IO.fromFuture {
IO {
client.get[ByteString](SignalFormat.lockKeyOf(playerName))
}
}
}.map(_.nonEmpty)
_ <-
if (currentlyLocked)
localLock(playerName).beginBlock
else
// always begin by blocking the application-local lock,
// since
// - if the lock is not present then the request-local promise
// that is allocated below will be completed soon
// - if the lock is present at this point, then the application-local lock
// would be unblocked when the lock on Redis expires
_ <- localLock(playerName).beginBlock

// allocate an empty promise that is local to this await request
requestLocalPromise <- Deferred[F, Unit]

// concurrently query the Redis, immediately completing
// the request-local promise when the lock is not present
_ <- Concurrent[F].start {
Monad[F].ifM(isAlreadyUnlockedOnRedis(playerName))(
requestLocalPromise.complete(()),
Monad[F].unit
// TODO end critical section
_ <- localLock(playerName).await
)
}

_ <- Concurrent[F].race(
localLock(playerName).await,
requestLocalPromise.get
)
} yield ()
}
}
Expand Down

0 comments on commit b61f4e1

Please sign in to comment.