diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala index 25883eb2..f86ee4ec 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala @@ -17,12 +17,8 @@ package dev.profunktor.redis4cats import cats.effect._ -import cats.effect.concurrent.Deferred -import cats.effect.implicits._ -import cats.implicits._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ -import scala.concurrent.duration._ import scala.util.control.NoStackTrace object pipeline { @@ -31,31 +27,19 @@ object pipeline { case class RedisPipeline[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] - ) extends HListRunner[F] { + ) { def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = - Deferred[F, Either[Throwable, w.R]].flatMap { promise => - F.info("Pipeline started") >> - Resource - .makeCase(cmd.disableAutoFlush >> runner(commands, HNil)) { - case ((fibs: HList), ExitCase.Completed) => - for { - _ <- F.info("Pipeline completed") - _ <- cmd.flushCommands - tr <- joinOrCancel(fibs, HNil)(true) - // Casting here is fine since we have a `Witness` that proves this true - _ <- promise.complete(tr.asInstanceOf[w.R].asRight) - } yield () - case ((fibs: HList), ExitCase.Error(e)) => - F.error(s"Pipeline failed: ${e.getMessage}") >> cancelFibers(fibs, PipelineError, promise) - case ((fibs: HList), ExitCase.Canceled) => - F.error("Pipeline canceled") >> cancelFibers(fibs, PipelineError, promise) - case _ => - F.error("Kernel panic: the impossible happened!") - } - .use(_ => F.unit) - .guarantee(cmd.enableAutoFlush) >> promise.get.rethrow.timeout(3.seconds) - } + Runner[F].exec( + Runner.Ops( + name = "Pipeline", + mainCmd = cmd.disableAutoFlush, + onComplete = (_: Runner.CancelFibers[F]) => cmd.flushCommands, + onError = F.unit, + afterCompletion = cmd.enableAutoFlush, + mkError = () => PipelineError + ) + )(commands) } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala index 3d9ebec6..517c987f 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala @@ -22,18 +22,64 @@ import cats.effect.implicits._ import cats.implicits._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ +import scala.concurrent.duration._ -class HListRunner[F[_]: Concurrent: Log] { +object Runner { + type CancelFibers[F[_]] = Throwable => F[Unit] + + case class Ops[F[_]]( + name: String, + mainCmd: F[Unit], + onComplete: CancelFibers[F] => F[Unit], + onError: F[Unit], + afterCompletion: F[Unit], + mkError: () => Throwable + ) + + def apply[F[_]: Concurrent: Log: Timer]: RunnerPartiallyApplied[F] = + new RunnerPartiallyApplied[F] +} + +private[redis4cats] class RunnerPartiallyApplied[F[_]: Concurrent: Log: Timer] { + + def exec[T <: HList, R <: HList](ops: Runner.Ops[F])(commands: T)(implicit w: Witness.Aux[T, R]): F[R] = + Deferred[F, Either[Throwable, w.R]].flatMap { promise => + def cancelFibers[A](fibs: HList)(err: Throwable): F[Unit] = + joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) + + F.info(s"${ops.name} started") >> + Resource + .makeCase(ops.mainCmd >> runner(commands, HNil)) { + case ((fibs: HList), ExitCase.Completed) => + for { + _ <- F.info(s"${ops.name} completed") + _ <- ops.onComplete(cancelFibers(fibs)) + tr <- joinOrCancel(fibs, HNil)(true) + // Casting here is fine since we have a `Witness` that proves this true + _ <- promise.complete(tr.asInstanceOf[w.R].asRight) + } yield () + case ((fibs: HList), ExitCase.Error(e)) => + F.error(s"${ops.name} failed: ${e.getMessage}") >> + ops.onError.guarantee(cancelFibers(fibs)(ops.mkError())) + case ((fibs: HList), ExitCase.Canceled) => + F.error(s"${ops.name} canceled") >> + ops.onError.guarantee(cancelFibers(fibs)(ops.mkError())) + case _ => + F.error("Kernel panic: the impossible happened!") + } + .use(_ => F.unit) + .guarantee(ops.afterCompletion) >> promise.get.rethrow.timeout(3.seconds) + } // Forks every command in order - def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = + private def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = ys match { case HNil => F.pure(res) case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res)) } // Joins or cancel fibers correspondent to previous executed commands - def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = + private def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = ys match { case HNil => F.pure(res) case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin => @@ -44,7 +90,4 @@ class HListRunner[F[_]: Concurrent: Log] { F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin) } - def cancelFibers[A](fibs: HList, err: Throwable, promise: Deferred[F, Either[Throwable, A]]): F[Unit] = - joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) - } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala index 0343b975..22e04128 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala @@ -17,12 +17,9 @@ package dev.profunktor.redis4cats import cats.effect._ -import cats.effect.concurrent._ -import cats.effect.implicits._ import cats.implicits._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ -import scala.concurrent.duration._ import scala.util.control.NoStackTrace object transactions { @@ -33,7 +30,7 @@ object transactions { case class RedisTransaction[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] - ) extends HListRunner[F] { + ) { /*** * Exclusively run Redis commands as part of a transaction. @@ -46,29 +43,16 @@ object transactions { * may end in unexpected results such as a dead lock. */ def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = - Deferred[F, Either[Throwable, w.R]].flatMap { promise => - F.info("Transaction started") >> - Resource - .makeCase(cmd.multi >> runner(commands, HNil)) { - case ((fibs: HList), ExitCase.Completed) => - for { - _ <- F.info("Transaction completed") - _ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e, promise) >> F.raiseError(e)) - tr <- joinOrCancel(fibs, HNil)(true) - // Casting here is fine since we have a `Witness` that proves this true - _ <- promise.complete(tr.asInstanceOf[w.R].asRight) - } yield () - case ((fibs: HList), ExitCase.Error(e)) => - F.error(s"Transaction failed: ${e.getMessage}") >> - cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise)) - case ((fibs: HList), ExitCase.Canceled) => - F.error("Transaction canceled") >> - cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise)) - case _ => - F.error("Kernel panic: the impossible happened!") - } - .use(_ => F.unit) >> promise.get.rethrow.timeout(3.seconds) - } + Runner[F].exec( + Runner.Ops( + name = "Transaction", + mainCmd = cmd.multi, + onComplete = (f: Runner.CancelFibers[F]) => cmd.exec.handleErrorWith(e => f(e) >> F.raiseError(e)), + onError = cmd.discard, + afterCompletion = F.unit, + mkError = () => TransactionAborted + ) + )(commands) } diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala index 2f472786..5192cb45 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala @@ -40,6 +40,8 @@ class RedisClusterSpec extends Redis4CatsFunSuite(true) with TestScenarios { test("cluster: scripts")(withRedis(scriptsScenario)) + test("cluster: pipelining")(withRedisCluster(pipelineScenario)) + // FIXME: The Cluster impl cannot connect to a single node just yet // test("cluster: transactions")(withRedisCluster(transactionScenario)) diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala index a41efbcb..b1ad479a 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala @@ -36,6 +36,8 @@ class RedisSpec extends Redis4CatsFunSuite(false) with TestScenarios { test("connection api")(withRedis(connectionScenario)) + test("pipelining")(withRedis(pipelineScenario)) + test("transactions: successful")(withRedis(transactionScenario)) test("transactions: canceled")(withRedis(canceledTransactionScenario)) diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala index 643cf719..97822edd 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala @@ -23,7 +23,8 @@ import cats.implicits._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.effects._ import dev.profunktor.redis4cats.hlist._ -import dev.profunktor.redis4cats.transactions._ +import dev.profunktor.redis4cats.pipeline.RedisPipeline +import dev.profunktor.redis4cats.transactions.RedisTransaction import io.lettuce.core.GeoArgs import scala.concurrent.duration._ @@ -249,17 +250,34 @@ trait TestScenarios { _ <- IO(assert(slowLogLen.isValidLong)) } yield () + def pipelineScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + val key1 = "testp1" + val key2 = "testp2" + + val operations = + cmd.set(key1, "osx") :: cmd.set(key2, "windows") :: cmd.get(key1) :: cmd.sIsMember("foo", "bar") :: + cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil + + RedisPipeline(cmd).exec(operations).map { + case _ ~: _ ~: res1 ~: res2 ~: _ ~: _ ~: res3 ~: HNil => + assert(res1.contains("osx")) + assert(res2 === false) + assert(res3.contains("nix")) + case tr => + assert(false, s"Unexpected result: $tr") + } + + } + def transactionScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { val key1 = "test1" val key2 = "test2" - val tx = RedisTransaction(cmd) - val operations = cmd.set(key1, "osx") :: cmd.set(key2, "windows") :: cmd.get(key1) :: cmd.sIsMember("foo", "bar") :: cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil - tx.exec(operations).map { + RedisTransaction(cmd).exec(operations).map { case _ ~: _ ~: res1 ~: res2 ~: _ ~: _ ~: res3 ~: HNil => assert(res1.contains("osx")) assert(res2 === false)