diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala index 7973cde5..2026851b 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala @@ -37,6 +37,15 @@ object hlist { } go(this, HNil) } + + def size: Int = { + def go(ys: HList, acc: Int): Int = + ys match { + case HNil => acc + case HCons(_, t) => go(t, acc + 1) + } + go(this, 0) + } } final case class HCons[+H, +Tail <: HList](head: H, tail: Tail) extends HList diff --git a/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSpec.scala b/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSpec.scala index 8c6f35fa..5f99d5c6 100644 --- a/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSpec.scala +++ b/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSpec.scala @@ -61,10 +61,12 @@ class HListSpec extends FunSuite { test("Conversion from standard list") { val lt = List("a", "b", "c") val hl = "a" :: "b" :: "c" :: HNil - assert(hl == HList.fromList(lt)) + assertEquals[HList, HList](hl, HList.fromList(lt)) + assertEquals(hl.size, lt.size) val el = List.empty[Int] - assert(HNil == HList.fromList(el)) + assertEquals[HList, HList](HNil, HList.fromList(el)) + assertEquals(HNil.size, el.size) } } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala index 13a0fb0b..ec5ab96a 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala @@ -16,16 +16,18 @@ package dev.profunktor.redis4cats +import scala.util.control.NoStackTrace + +import cats.Parallel import cats.effect._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ -import scala.util.control.NoStackTrace object pipeline { case object PipelineError extends NoStackTrace - case class RedisPipeline[F[_]: Concurrent: Log: Timer, K, V]( + case class RedisPipeline[F[_]: Concurrent: Log: Parallel: Timer, K, V]( cmd: RedisCommands[F, K, V] ) { diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala index 92cc9bfb..c60fed85 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala @@ -16,15 +16,17 @@ package dev.profunktor.redis4cats +import java.util.UUID + +import scala.concurrent.duration._ + +import cats.Parallel import cats.effect._ -import cats.effect.concurrent.Deferred +import cats.effect.concurrent.{ Deferred, Ref } import cats.effect.implicits._ import cats.syntax.all._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ -import java.util.UUID -import java.util.concurrent.TimeUnit -import scala.concurrent.duration._ object Runner { type CancelFibers[F[_]] = Throwable => F[Unit] @@ -38,79 +40,61 @@ object Runner { mkError: () => Throwable ) - def apply[F[_]: Concurrent: Log: Timer]: RunnerPartiallyApplied[F] = + def apply[F[_]: Concurrent: Log: Parallel: Timer]: RunnerPartiallyApplied[F] = new RunnerPartiallyApplied[F] } -private[redis4cats] class RunnerPartiallyApplied[F[_]: Concurrent: Log: Timer] { +private[redis4cats] class RunnerPartiallyApplied[F[_]: Concurrent: Log: Parallel: Timer] { def filterExec[T <: HList, R <: HList, S <: HList](ops: Runner.Ops[F])(commands: T)( implicit w: Witness.Aux[T, R], f: Filter.Aux[R, S] ): F[S] = exec[T, R](ops)(commands).map(_.filterUnit) - /** - * This is unfortunately the easiest way to get optimistic locking to work in a deterministic way. Details follows. - * - * Every transactional command is forked, yielding a Fiber that is part of an HList. Modeling the transaction as - * a `Resource`, we spawn all the fibers representing the commands and simulate a bit of delay to let the underlying - * `ExecutionContext` schedule them before be proceed with the finalizer of the transaction, which always means - * calling EXEC in a successful transaction. - * - * Without this tiny delay, we sometimes get into a race condition where not all the fibers have been scheduled - * (meaning they haven't yet reached Redis), and where EXEC reaches the server before all the commands, making - * the transaction result successful but not always deterministic. - * - * The `OptimisticLockSuite` tests the determinism of this implementation. - * - * A proper way to implement this might be to manage our own `ExecutionContext` so we could tell exactly when all the - * fibers have been scheduled, and only then trigger the EXEC command. This would change the API, though, and it is - * not as easy as it sounds but we can try and experiment with this in the future, if the time allows it. - */ - private def getTxDelay: F[FiniteDuration] = - F.delay { - Duration(sys.env.getOrElse("REDIS4CATS_TX_DELAY", "50.millis")) match { - case fd: FiniteDuration => fd - case x => FiniteDuration(x.toMillis, TimeUnit.MILLISECONDS) - } - } - def exec[T <: HList, R <: HList](ops: Runner.Ops[F])(commands: T)(implicit w: Witness.Aux[T, R]): F[R] = - (Deferred[F, Either[Throwable, w.R]], F.delay(UUID.randomUUID), getTxDelay).tupled.flatMap { - case (promise, uuid, txDelay) => - def cancelFibers[A](fibs: HList)(after: F[Unit])(err: Throwable): F[Unit] = - joinOrCancel(fibs, HNil)(false).void.guarantee(after) >> promise.complete(err.asLeft) + (Deferred[F, Either[Throwable, w.R]], F.delay(UUID.randomUUID)).tupled.flatMap { + case (promise, uuid) => + def cancelFibers[A](fibs: HList)(err: Throwable): F[Unit] = + joinOrCancel(fibs, HNil)(false) >> promise.complete(err.asLeft) def onErrorOrCancelation(fibs: HList): F[Unit] = - cancelFibers(fibs)(ops.onError)(ops.mkError()) - - F.debug(s"${ops.name} started - ID: $uuid") >> - Resource - .makeCase(ops.mainCmd >> runner(commands, HNil)) { - case (fibs, ExitCase.Completed) => - for { - _ <- F.debug(s"${ops.name} completed - ID: $uuid") - _ <- ops.onComplete(cancelFibers(fibs)(F.unit)) - tr <- joinOrCancel(fibs, HNil)(true) - // Casting here is fine since we have a `Witness` that proves this true - _ <- promise.complete(tr.asInstanceOf[w.R].asRight) - } yield () - case (fibs, ExitCase.Error(e)) => - F.error(s"${ops.name} failed: ${e.getMessage} - ID: $uuid") >> - onErrorOrCancelation(fibs) - case (fibs, ExitCase.Canceled) => - F.error(s"${ops.name} canceled - ID: $uuid") >> - onErrorOrCancelation(fibs) - } - .use(_ => F.sleep(txDelay).void) - .guarantee(ops.afterCompletion) >> promise.get.rethrow.timeout(3.seconds) + cancelFibers(fibs)(ops.mkError()).guarantee(ops.onError) + + (Deferred[F, Unit], Ref.of[F, Int](0)).tupled + .flatMap { + case (gate, counter) => + // wait for commands to be scheduled + val synchronizer: F[Unit] = + counter.modify { + case n if n === (commands.size - 1) => n + 1 -> gate.complete(()) + case n => n + 1 -> F.unit + }.flatten + + F.debug(s"${ops.name} started - ID: $uuid") >> + (ops.mainCmd >> runner(synchronizer, commands, HNil)) + .bracketCase(_ => gate.get) { + case (fibs, ExitCase.Completed) => + for { + _ <- F.debug(s"${ops.name} completed - ID: $uuid") + _ <- ops.onComplete(cancelFibers(fibs)) + r <- joinOrCancel(fibs, HNil)(true) + // Casting here is fine since we have a `Witness` that proves this true + _ <- promise.complete(r.asInstanceOf[w.R].asRight) + } yield () + case (fibs, ExitCase.Error(e)) => + F.error(s"${ops.name} failed: ${e.getMessage} - ID: $uuid") >> onErrorOrCancelation(fibs) + case (fibs, ExitCase.Canceled) => + F.error(s"${ops.name} canceled - ID: $uuid") >> onErrorOrCancelation(fibs) + } + .guarantee(ops.afterCompletion) >> promise.get.rethrow.timeout(3.seconds) + } } // Forks every command in order - private def runner[H <: HList, G <: HList](ys: H, res: G): F[HList] = + private def runner[H <: HList, G <: HList](f: F[Unit], ys: H, res: G): F[HList] = ys match { case HNil => F.pure(res) - case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res)) + case HCons((h: F[_] @unchecked), t) => (h, f).parTupled.map(_._1).start.flatMap(fb => runner(f, t, fb :: res)) } // Joins or cancel fibers correspondent to previous executed commands diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala index b18c3490..b19e6d91 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala @@ -16,11 +16,13 @@ package dev.profunktor.redis4cats +import scala.util.control.NoStackTrace + +import cats.Parallel import cats.effect._ import cats.syntax.all._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ -import scala.util.control.NoStackTrace object transactions { @@ -28,7 +30,7 @@ object transactions { case object TransactionAborted extends TransactionError case object TransactionDiscarded extends TransactionError - case class RedisTransaction[F[_]: Concurrent: Log: Timer, K, V]( + case class RedisTransaction[F[_]: Concurrent: Log: Parallel: Timer, K, V]( cmd: RedisCommands[F, K, V] ) { diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala index 5e980bb4..7d271bae 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala @@ -197,7 +197,7 @@ trait TestScenarios { self: FunSuite => val key2 = "key2" for { x <- cmd.get(key1) - _ <- IO(assert(x.isEmpty)) + _ <- IO(assertEquals(x, None)) exist1 <- cmd.exists(key1) _ <- IO(assert(!exist1)) idletime1 <- cmd.objectIdletime(key1) @@ -408,8 +408,6 @@ trait TestScenarios { self: FunSuite => } } - // With the current implementation (see `Runner#getTxDelay`), we cannot guarantee the commands - // are executed in order with the async API but only that the execution is either successful or failed. def transactionScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { val key1 = "txtest1" val val1 = "osx" @@ -419,24 +417,25 @@ trait TestScenarios { self: FunSuite => val val3 = "linux" val del1 = "deleteme" - val operations = cmd.set(key1, val1) :: cmd.set(key2, val2) :: cmd.set(key3, val3) :: cmd.del(del1) :: HNil + val operations = cmd.set(key2, val2) :: cmd.get(key1) :: cmd.set(key3, val3) :: cmd.del(del1) :: HNil - for { - _ <- cmd.set(del1, "foo") - _ <- RedisTransaction(cmd).exec(operations).void - v1 <- cmd.get(key1) - v2 <- cmd.get(key2) - v3 <- cmd.get(key3) - d1 <- cmd.exists(del1) - } yield { - assertEquals(v1, Some(val1)) - assertEquals(v2, Some(val2)) - assertEquals(v3, Some(val3)) - assert(!d1) - } + cmd.set(del1, "foo") >> cmd.set(key1, val1) >> + RedisTransaction(cmd) + .filterExec(operations) + .map { + case res1 ~: res2 ~: HNil => + assertEquals(res1, Some(val1)) + assertEquals(res2, 1L) + } + .flatMap { _ => + (cmd.get(key2), cmd.get(key3)).mapN { + case (x, y) => + assertEquals(x, Some(val2)) + assertEquals(y, Some(val3)) + } + } } - // cannot guarantee the order of execution with the async API def transactionDoubleSetScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { val key = "txtest-double-set" @@ -456,16 +455,7 @@ trait TestScenarios { self: FunSuite => val commands = cmd.set(key1, "v1") :: cmd.set(key2, "v2") :: cmd.set("tx-3", "v3") :: HNil // We race it with a plain `IO.unit` so the transaction may or may not start at all but the result should be the same - val verifyKey1 = - IO.race(tx.exec(commands).attempt.void, IO.unit) >> - cmd.get(key1).map(assertEquals(_, None)) // no keys written - - // We race it with a sleep to make sure the transaction gets time to start running - val verifyKey2 = - IO.race(tx.exec(commands).attempt.void, IO.sleep(20.millis).void) >> - cmd.get(key2).map(assertEquals(_, None)) // no keys written - - verifyKey1 >> verifyKey2 + IO.race(tx.exec(commands), IO.unit) >> cmd.get(key1).map(assertEquals(_, None)) // no keys written } def scriptsScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { diff --git a/site/docs/transactions.md b/site/docs/transactions.md index 8dc1081f..c96f37fc 100644 --- a/site/docs/transactions.md +++ b/site/docs/transactions.md @@ -15,15 +15,9 @@ Note that every command has to be forked (`.start`) because the commands need to These are internals, though. All you need to care about is what commands you want to run as part of a transaction and handle the possible errors and retry logic. -##### Write-only commands - -⚠️ Only writing commands such as `set`, `hset` and `del` are supported. Although using `get` as part of a transaction would compile, the result will be non-deterministic due to the asynchronous nature of the implementation. ⚠️ - -In the future, this might be supported but the only way to get it right is either to use the underlying synchronous API or to use a custom `ExecutionContext` so we can control the order of execution as well as when a command has been effectively dispatched. Arguably, though, the most common cases for transactional commands and optimistic locking involve *only writing* for which the existing API should be good enough. - ##### Concurrent transactions -⚠️ if we want to run transactions concurrently, we need to acquire a connection per transaction (`RedisCommands`), as `MULTI` can not be called concurrently within the same connection, reason why it is recommended to share the same `RedisClient`. ⚠️ +⚠️ in order to run transactions concurrently, you'd need to acquire a connection per transaction (`RedisCommands`), as `MULTI` can not be called concurrently within the same connection. For such cases, it is recommended to share the same `RedisClient`. ⚠️ ### Working with transactions @@ -74,7 +68,7 @@ val showResult: String => Option[String] => IO[Unit] = key => commandsApi.use { cmd => // RedisCommands[IO, String, String] val tx = RedisTransaction(cmd) - val setters = cmd.set(key3, "delete_me") + val setters = cmd.set(key2, "delete_me") >> cmd.set(key3, "foo") val getters = cmd.get(key1).flatTap(showResult(key1)) >> @@ -82,15 +76,16 @@ commandsApi.use { cmd => // RedisCommands[IO, String, String] // the commands type is fully inferred // IO[Unit] :: IO[Option[String]] :: IO[Unit] :: HNil - val commands = cmd.set(key1, "foo") :: cmd.del(key2) :: cmd.set(key3, "bar") :: HNil + val commands = cmd.set(key1, "foo") :: cmd.del(key2) :: cmd.get(key3) :: HNil // the result type is inferred as well // Option[String] :: HNil val prog = tx.filterExec(commands) .flatMap { - case res ~: HNil => - putStrLn(s"Key2 result: $res") + case res1 ~: res2 ~: HNil => + putStrLn(s"Key2 result: $res1") >> + putStrLn(s"Key3 result: $res2") } .onError { case TransactionAborted => @@ -135,7 +130,7 @@ commandsApi.use { cmd => } ``` -You should never pass a transactional command: `MULTI`, `EXEC` or `DISCARD`. +You should never pass a transactional command: `MULTI`, `EXEC` or `DISCARD`. These commands are made available in case you want to handle transactions manually, which you should do *at your own risk*. The following example will result in a successful transaction on Redis. Yet, the operation will end up raising the error passed as a command.