From 5b9629b05e44ff52a29091d8a8f2baac1d54c877 Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Sat, 9 May 2020 00:31:53 +0200 Subject: [PATCH 1/2] Redis pipelining as in transactions; --- .../dev/profunktor/redis4cats/pipeline.scala | 42 +++++++++--- .../dev/profunktor/redis4cats/redis.scala | 6 +- .../dev/profunktor/redis4cats/runner.scala | 50 ++++++++++++++ .../profunktor/redis4cats/transactions.scala | 65 ++++++------------- .../redis4cats/RedisPipelineDemo.scala | 37 +++++++---- .../redis4cats/RedisTransactionsDemo.scala | 5 +- site/docs/pipelining.md | 48 +++++++++----- 7 files changed, 165 insertions(+), 88 deletions(-) create mode 100644 modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala index e1b045b8..25883eb2 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala @@ -17,24 +17,46 @@ package dev.profunktor.redis4cats import cats.effect._ +import cats.effect.concurrent.Deferred import cats.effect.implicits._ import cats.implicits._ import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.hlist._ +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace object pipeline { - case class RedisPipeline[F[_]: Bracket[*[_], Throwable]: Log, K, V]( + case object PipelineError extends NoStackTrace + + case class RedisPipeline[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] - ) { - def run[A](fa: F[A]): F[A] = - F.info("Pipeline started") *> - cmd.disableAutoFlush - .bracketCase(_ => fa) { - case (_, ExitCase.Completed) => cmd.flushCommands *> F.info("Pipeline completed") - case (_, ExitCase.Error(e)) => F.error(s"Pipeline failed: ${e.getMessage}") - case (_, ExitCase.Canceled) => F.error("Pipeline canceled") + ) extends HListRunner[F] { + + def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = + Deferred[F, Either[Throwable, w.R]].flatMap { promise => + F.info("Pipeline started") >> + Resource + .makeCase(cmd.disableAutoFlush >> runner(commands, HNil)) { + case ((fibs: HList), ExitCase.Completed) => + for { + _ <- F.info("Pipeline completed") + _ <- cmd.flushCommands + tr <- joinOrCancel(fibs, HNil)(true) + // Casting here is fine since we have a `Witness` that proves this true + _ <- promise.complete(tr.asInstanceOf[w.R].asRight) + } yield () + case ((fibs: HList), ExitCase.Error(e)) => + F.error(s"Pipeline failed: ${e.getMessage}") >> cancelFibers(fibs, PipelineError, promise) + case ((fibs: HList), ExitCase.Canceled) => + F.error("Pipeline canceled") >> cancelFibers(fibs, PipelineError, promise) + case _ => + F.error("Kernel panic: the impossible happened!") } - .guarantee(cmd.enableAutoFlush) + .use(_ => F.unit) + .guarantee(cmd.enableAutoFlush) >> promise.get.rethrow.timeout(3.seconds) + } + } } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala index c2ce157b..52421837 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala @@ -259,13 +259,13 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( /******************************* AutoFlush API **********************************/ override def enableAutoFlush: F[Unit] = - async.flatMap(c => F.delay(c.setAutoFlushCommands(true))) + blocker.blockOn(async.flatMap(c => blocker.delay(c.setAutoFlushCommands(true)))) override def disableAutoFlush: F[Unit] = - async.flatMap(c => F.delay(c.setAutoFlushCommands(false))) + blocker.blockOn(async.flatMap(c => blocker.delay(c.setAutoFlushCommands(false)))) override def flushCommands: F[Unit] = - async.flatMap(c => F.delay(c.flushCommands())) + blocker.blockOn(async.flatMap(c => blocker.delay(c.flushCommands()))) /******************************* Strings API **********************************/ override def append(key: K, value: V): F[Unit] = diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala new file mode 100644 index 00000000..3d9ebec6 --- /dev/null +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2018-2020 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats + +import cats.effect._ +import cats.effect.concurrent.Deferred +import cats.effect.implicits._ +import cats.implicits._ +import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.hlist._ + +class HListRunner[F[_]: Concurrent: Log] { + + // Forks every command in order + def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = + ys match { + case HNil => F.pure(res) + case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res)) + } + + // Joins or cancel fibers correspondent to previous executed commands + def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = + ys match { + case HNil => F.pure(res) + case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin => + h.join.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) + case HCons((h: Fiber[F, Any] @unchecked), t) => + h.cancel.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) + case HCons(h, t) => + F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin) + } + + def cancelFibers[A](fibs: HList, err: Throwable, promise: Deferred[F, Either[Throwable, A]]): F[Unit] = + joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) + +} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala index 5b701079..0343b975 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala @@ -33,7 +33,7 @@ object transactions { case class RedisTransaction[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] - ) { + ) extends HListRunner[F] { /*** * Exclusively run Redis commands as part of a transaction. @@ -47,50 +47,27 @@ object transactions { */ def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = Deferred[F, Either[Throwable, w.R]].flatMap { promise => - // Forks every command in order - def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = - ys match { - case HNil => F.pure(res) - case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res)) - } - - // Joins or cancel fibers correspondent to previous executed commands - def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = - ys match { - case HNil => F.pure(res) - case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin => - h.join.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) - case HCons((h: Fiber[F, Any] @unchecked), t) => - h.cancel.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) - case HCons(h, t) => - F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin) - } - - def cancelFibers(fibs: HList, err: Throwable = TransactionAborted): F[Unit] = - joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) - - val tx = - Resource.makeCase(cmd.multi >> runner(commands, HNil)) { - case ((fibs: HList), ExitCase.Completed) => - for { - _ <- F.info("Transaction completed") - _ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e) >> F.raiseError(e)) - tr <- joinOrCancel(fibs, HNil)(true) - // Casting here is fine since we have a `Witness` that proves this true - _ <- promise.complete(tr.asInstanceOf[w.R].asRight) - } yield () - case ((fibs: HList), ExitCase.Error(e)) => - F.error(s"Transaction failed: ${e.getMessage}") >> - cmd.discard.guarantee(cancelFibers(fibs)) - case ((fibs: HList), ExitCase.Canceled) => - F.error("Transaction canceled") >> - cmd.discard.guarantee(cancelFibers(fibs)) - case _ => - F.error("Kernel panic: the impossible happened!") - } - F.info("Transaction started") >> - (tx.use(_ => F.unit) >> promise.get.rethrow).timeout(3.seconds) + Resource + .makeCase(cmd.multi >> runner(commands, HNil)) { + case ((fibs: HList), ExitCase.Completed) => + for { + _ <- F.info("Transaction completed") + _ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e, promise) >> F.raiseError(e)) + tr <- joinOrCancel(fibs, HNil)(true) + // Casting here is fine since we have a `Witness` that proves this true + _ <- promise.complete(tr.asInstanceOf[w.R].asRight) + } yield () + case ((fibs: HList), ExitCase.Error(e)) => + F.error(s"Transaction failed: ${e.getMessage}") >> + cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise)) + case ((fibs: HList), ExitCase.Canceled) => + F.error("Transaction canceled") >> + cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise)) + case _ => + F.error("Kernel panic: the impossible happened!") + } + .use(_ => F.unit) >> promise.get.rethrow.timeout(3.seconds) } } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala index b570bcf0..47622886 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala @@ -20,18 +20,20 @@ import cats.effect._ import cats.implicits._ import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.hlist._ import dev.profunktor.redis4cats.pipeline._ -import scala.concurrent.duration._ +import java.util.concurrent.TimeoutException object RedisPipelineDemo extends LoggerIOApp { import Demo._ def program(implicit log: Log[IO]): IO[Unit] = { - val key = "testp" + val key1 = "testp1" + val key2 = "testp2" - val showResult: Int => Option[String] => IO[Unit] = n => - _.fold(putStrLn(s"Not found key $key-$n"))(s => putStrLn(s)) + val showResult: String => Option[String] => IO[Unit] = key => + _.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s")) val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = for { @@ -42,16 +44,29 @@ object RedisPipelineDemo extends LoggerIOApp { commandsApi .use { cmd => - def traversal(f: Int => IO[Unit]): IO[Unit] = - List.range(0, 50).traverse(f).void + val getters = + cmd.get(key1).flatTap(showResult(key1)) *> + cmd.get(key2).flatTap(showResult(key2)) - val setters: IO[Unit] = - traversal(n => cmd.set(s"$key-$n", (n * 2).toString).start.void) + val operations = + cmd.set(key1, "noop") :: cmd.set(key2, "windows") :: cmd.get(key1) :: + cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil - val getters: IO[Unit] = - traversal(n => cmd.get(s"$key-$n").flatMap(showResult(n))) + val prog = + RedisPipeline(cmd) + .exec(operations) + .flatMap { + case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil => + putStrLn(s"res1: $res1, res2: $res2") + } + .onError { + case PipelineError => + putStrLn("[Error] - Pipeline failed") + case _: TimeoutException => + putStrLn("[Error] - Timeout") + } - RedisPipeline(cmd).run(setters) *> IO.sleep(2.seconds) *> getters + getters >> prog >> getters >> putStrLn("keep doing stuff...") } } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala index 4020bbe7..9c5f131b 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala @@ -44,8 +44,6 @@ object RedisTransactionsDemo extends LoggerIOApp { commandsApi .use { cmd => - val tx = RedisTransaction(cmd) - val getters = cmd.get(key1).flatTap(showResult(key1)) *> cmd.get(key2).flatTap(showResult(key2)) @@ -59,7 +57,8 @@ object RedisTransactionsDemo extends LoggerIOApp { //type Res = Unit :: Unit :: Option[String] :: Unit :: Unit :: Option[String] :: HNil val prog = - tx.exec(operations) + RedisTransaction(cmd) + .exec(operations) .flatMap { case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil => putStrLn(s"res1: $res1, res2: $res2") diff --git a/site/docs/pipelining.md b/site/docs/pipelining.md index ffee5e88..3c56105f 100644 --- a/site/docs/pipelining.md +++ b/site/docs/pipelining.md @@ -11,15 +11,13 @@ Use [pipelining](https://redis.io/topics/pipelining) to speed up your queries by `redis4cats` provides a `RedisPipeline` utility that models this behavior with some guarantees described below: -- `acquire`: disable autoflush. -- `use`: send a bunch of commands defined as `F[A]`. +- `acquire`: disable autoflush and send a bunch of commands defined as a custom `HList`. - `release`: either flush commands on success or log error on failure / cancellation. - `guarantee`: re-enable autoflush. ### RedisPipeline usage -The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern -is so common it is recommended to just use `RedisPipeline`. You can create a pipeline by passing the commands API as a parameter and invoke the `run` function given the set of commands you wish to send to the server. +The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern is so common it is recommended to just use `RedisPipeline`. You can create a pipeline by passing the commands API as a parameter and invoke the `exec` function given the set of commands you wish to send to the server. Note that every command has to be forked (`.start`) because the commands need to be sent to the server in an asynchronous way but no response will be received until the commands are successfully flushed. Also, it is not possible to sequence commands (`flatMap`) that are part of a pipeline. Every command has to be atomic and independent of previous results. @@ -43,7 +41,9 @@ val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = { import cats.effect.IO import cats.implicits._ import dev.profunktor.redis4cats._ +import dev.profunktor.redis4cats.hlist._ import dev.profunktor.redis4cats.pipeline._ +import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import scala.concurrent.ExecutionContext @@ -51,22 +51,36 @@ implicit val timer = IO.timer(ExecutionContext.global) def putStrLn(str: String): IO[Unit] = IO(println(str)) -val key = "testp" +val key1 = "testp1" +val key2 = "testp2" -val showResult: Int => Option[String] => IO[Unit] = n => - _.fold(putStrLn(s"Not found key $key-$n"))(s => putStrLn(s)) +val showResult: String => Option[String] => IO[Unit] = key => +_.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s")) commandsApi.use { cmd => // RedisCommands[IO, String, String] - def traversal(f: Int => IO[Unit]): IO[Unit] = - List.range(0, 50).traverse(f).void - - val setters: IO[Unit] = - traversal(n => cmd.set(s"$key-$n", (n * 2).toString).start.void) - - val getters: IO[Unit] = - traversal(n => cmd.get(s"$key-$n").flatMap(showResult(n))) - - RedisPipeline(cmd).run(setters) *> IO.sleep(2.seconds) *> getters + val getters = + cmd.get(key1).flatTap(showResult(key1)) *> + cmd.get(key2).flatTap(showResult(key2)) + + val operations = + cmd.set(key1, "noop") :: cmd.set(key2, "windows") :: cmd.get(key1) :: + cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil + + val prog = + RedisPipeline(cmd) + .exec(operations) + .flatMap { + case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil => + putStrLn(s"res1: $res1, res2: $res2") + } + .onError { + case PipelineError => + putStrLn("[Error] - Pipeline failed") + case _: TimeoutException => + putStrLn("[Error] - Timeout") + } + + getters >> prog >> getters >> putStrLn("keep doing stuff...") } ``` From dbaa99031d783d94922eb3b0cd4b3cf4847ab2ed Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Sat, 9 May 2020 12:35:00 +0200 Subject: [PATCH 2/2] Generic HList runner for both transactions and pipelining --- .../dev/profunktor/redis4cats/pipeline.scala | 38 ++++--------- .../dev/profunktor/redis4cats/runner.scala | 55 +++++++++++++++++-- .../profunktor/redis4cats/transactions.scala | 38 ++++--------- .../redis4cats/RedisClusterSpec.scala | 2 + .../dev/profunktor/redis4cats/RedisSpec.scala | 2 + .../profunktor/redis4cats/TestScenarios.scala | 26 +++++++-- 6 files changed, 97 insertions(+), 64 deletions(-) diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala index 25883eb2..f86ee4ec 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala @@ -17,12 +17,8 @@ package dev.profunktor.redis4cats import cats.effect._ -import cats.effect.concurrent.Deferred -import cats.effect.implicits._ -import cats.implicits._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ -import scala.concurrent.duration._ import scala.util.control.NoStackTrace object pipeline { @@ -31,31 +27,19 @@ object pipeline { case class RedisPipeline[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] - ) extends HListRunner[F] { + ) { def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = - Deferred[F, Either[Throwable, w.R]].flatMap { promise => - F.info("Pipeline started") >> - Resource - .makeCase(cmd.disableAutoFlush >> runner(commands, HNil)) { - case ((fibs: HList), ExitCase.Completed) => - for { - _ <- F.info("Pipeline completed") - _ <- cmd.flushCommands - tr <- joinOrCancel(fibs, HNil)(true) - // Casting here is fine since we have a `Witness` that proves this true - _ <- promise.complete(tr.asInstanceOf[w.R].asRight) - } yield () - case ((fibs: HList), ExitCase.Error(e)) => - F.error(s"Pipeline failed: ${e.getMessage}") >> cancelFibers(fibs, PipelineError, promise) - case ((fibs: HList), ExitCase.Canceled) => - F.error("Pipeline canceled") >> cancelFibers(fibs, PipelineError, promise) - case _ => - F.error("Kernel panic: the impossible happened!") - } - .use(_ => F.unit) - .guarantee(cmd.enableAutoFlush) >> promise.get.rethrow.timeout(3.seconds) - } + Runner[F].exec( + Runner.Ops( + name = "Pipeline", + mainCmd = cmd.disableAutoFlush, + onComplete = (_: Runner.CancelFibers[F]) => cmd.flushCommands, + onError = F.unit, + afterCompletion = cmd.enableAutoFlush, + mkError = () => PipelineError + ) + )(commands) } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala index 3d9ebec6..517c987f 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala @@ -22,18 +22,64 @@ import cats.effect.implicits._ import cats.implicits._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ +import scala.concurrent.duration._ -class HListRunner[F[_]: Concurrent: Log] { +object Runner { + type CancelFibers[F[_]] = Throwable => F[Unit] + + case class Ops[F[_]]( + name: String, + mainCmd: F[Unit], + onComplete: CancelFibers[F] => F[Unit], + onError: F[Unit], + afterCompletion: F[Unit], + mkError: () => Throwable + ) + + def apply[F[_]: Concurrent: Log: Timer]: RunnerPartiallyApplied[F] = + new RunnerPartiallyApplied[F] +} + +private[redis4cats] class RunnerPartiallyApplied[F[_]: Concurrent: Log: Timer] { + + def exec[T <: HList, R <: HList](ops: Runner.Ops[F])(commands: T)(implicit w: Witness.Aux[T, R]): F[R] = + Deferred[F, Either[Throwable, w.R]].flatMap { promise => + def cancelFibers[A](fibs: HList)(err: Throwable): F[Unit] = + joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) + + F.info(s"${ops.name} started") >> + Resource + .makeCase(ops.mainCmd >> runner(commands, HNil)) { + case ((fibs: HList), ExitCase.Completed) => + for { + _ <- F.info(s"${ops.name} completed") + _ <- ops.onComplete(cancelFibers(fibs)) + tr <- joinOrCancel(fibs, HNil)(true) + // Casting here is fine since we have a `Witness` that proves this true + _ <- promise.complete(tr.asInstanceOf[w.R].asRight) + } yield () + case ((fibs: HList), ExitCase.Error(e)) => + F.error(s"${ops.name} failed: ${e.getMessage}") >> + ops.onError.guarantee(cancelFibers(fibs)(ops.mkError())) + case ((fibs: HList), ExitCase.Canceled) => + F.error(s"${ops.name} canceled") >> + ops.onError.guarantee(cancelFibers(fibs)(ops.mkError())) + case _ => + F.error("Kernel panic: the impossible happened!") + } + .use(_ => F.unit) + .guarantee(ops.afterCompletion) >> promise.get.rethrow.timeout(3.seconds) + } // Forks every command in order - def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = + private def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = ys match { case HNil => F.pure(res) case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res)) } // Joins or cancel fibers correspondent to previous executed commands - def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = + private def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = ys match { case HNil => F.pure(res) case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin => @@ -44,7 +90,4 @@ class HListRunner[F[_]: Concurrent: Log] { F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin) } - def cancelFibers[A](fibs: HList, err: Throwable, promise: Deferred[F, Either[Throwable, A]]): F[Unit] = - joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) - } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala index 0343b975..22e04128 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala @@ -17,12 +17,9 @@ package dev.profunktor.redis4cats import cats.effect._ -import cats.effect.concurrent._ -import cats.effect.implicits._ import cats.implicits._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.hlist._ -import scala.concurrent.duration._ import scala.util.control.NoStackTrace object transactions { @@ -33,7 +30,7 @@ object transactions { case class RedisTransaction[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] - ) extends HListRunner[F] { + ) { /*** * Exclusively run Redis commands as part of a transaction. @@ -46,29 +43,16 @@ object transactions { * may end in unexpected results such as a dead lock. */ def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = - Deferred[F, Either[Throwable, w.R]].flatMap { promise => - F.info("Transaction started") >> - Resource - .makeCase(cmd.multi >> runner(commands, HNil)) { - case ((fibs: HList), ExitCase.Completed) => - for { - _ <- F.info("Transaction completed") - _ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e, promise) >> F.raiseError(e)) - tr <- joinOrCancel(fibs, HNil)(true) - // Casting here is fine since we have a `Witness` that proves this true - _ <- promise.complete(tr.asInstanceOf[w.R].asRight) - } yield () - case ((fibs: HList), ExitCase.Error(e)) => - F.error(s"Transaction failed: ${e.getMessage}") >> - cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise)) - case ((fibs: HList), ExitCase.Canceled) => - F.error("Transaction canceled") >> - cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise)) - case _ => - F.error("Kernel panic: the impossible happened!") - } - .use(_ => F.unit) >> promise.get.rethrow.timeout(3.seconds) - } + Runner[F].exec( + Runner.Ops( + name = "Transaction", + mainCmd = cmd.multi, + onComplete = (f: Runner.CancelFibers[F]) => cmd.exec.handleErrorWith(e => f(e) >> F.raiseError(e)), + onError = cmd.discard, + afterCompletion = F.unit, + mkError = () => TransactionAborted + ) + )(commands) } diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala index 2f472786..5192cb45 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala @@ -40,6 +40,8 @@ class RedisClusterSpec extends Redis4CatsFunSuite(true) with TestScenarios { test("cluster: scripts")(withRedis(scriptsScenario)) + test("cluster: pipelining")(withRedisCluster(pipelineScenario)) + // FIXME: The Cluster impl cannot connect to a single node just yet // test("cluster: transactions")(withRedisCluster(transactionScenario)) diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala index a41efbcb..b1ad479a 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala @@ -36,6 +36,8 @@ class RedisSpec extends Redis4CatsFunSuite(false) with TestScenarios { test("connection api")(withRedis(connectionScenario)) + test("pipelining")(withRedis(pipelineScenario)) + test("transactions: successful")(withRedis(transactionScenario)) test("transactions: canceled")(withRedis(canceledTransactionScenario)) diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala index 643cf719..97822edd 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala @@ -23,7 +23,8 @@ import cats.implicits._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.effects._ import dev.profunktor.redis4cats.hlist._ -import dev.profunktor.redis4cats.transactions._ +import dev.profunktor.redis4cats.pipeline.RedisPipeline +import dev.profunktor.redis4cats.transactions.RedisTransaction import io.lettuce.core.GeoArgs import scala.concurrent.duration._ @@ -249,17 +250,34 @@ trait TestScenarios { _ <- IO(assert(slowLogLen.isValidLong)) } yield () + def pipelineScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + val key1 = "testp1" + val key2 = "testp2" + + val operations = + cmd.set(key1, "osx") :: cmd.set(key2, "windows") :: cmd.get(key1) :: cmd.sIsMember("foo", "bar") :: + cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil + + RedisPipeline(cmd).exec(operations).map { + case _ ~: _ ~: res1 ~: res2 ~: _ ~: _ ~: res3 ~: HNil => + assert(res1.contains("osx")) + assert(res2 === false) + assert(res3.contains("nix")) + case tr => + assert(false, s"Unexpected result: $tr") + } + + } + def transactionScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { val key1 = "test1" val key2 = "test2" - val tx = RedisTransaction(cmd) - val operations = cmd.set(key1, "osx") :: cmd.set(key2, "windows") :: cmd.get(key1) :: cmd.sIsMember("foo", "bar") :: cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil - tx.exec(operations).map { + RedisTransaction(cmd).exec(operations).map { case _ ~: _ ~: res1 ~: res2 ~: _ ~: _ ~: res3 ~: HNil => assert(res1.contains("osx")) assert(res2 === false)