From 5b9629b05e44ff52a29091d8a8f2baac1d54c877 Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Sat, 9 May 2020 00:31:53 +0200 Subject: [PATCH] Redis pipelining as in transactions; --- .../dev/profunktor/redis4cats/pipeline.scala | 42 +++++++++--- .../dev/profunktor/redis4cats/redis.scala | 6 +- .../dev/profunktor/redis4cats/runner.scala | 50 ++++++++++++++ .../profunktor/redis4cats/transactions.scala | 65 ++++++------------- .../redis4cats/RedisPipelineDemo.scala | 37 +++++++---- .../redis4cats/RedisTransactionsDemo.scala | 5 +- site/docs/pipelining.md | 48 +++++++++----- 7 files changed, 165 insertions(+), 88 deletions(-) create mode 100644 modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala index e1b045b8..25883eb2 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala @@ -17,24 +17,46 @@ package dev.profunktor.redis4cats import cats.effect._ +import cats.effect.concurrent.Deferred import cats.effect.implicits._ import cats.implicits._ import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.hlist._ +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace object pipeline { - case class RedisPipeline[F[_]: Bracket[*[_], Throwable]: Log, K, V]( + case object PipelineError extends NoStackTrace + + case class RedisPipeline[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] - ) { - def run[A](fa: F[A]): F[A] = - F.info("Pipeline started") *> - cmd.disableAutoFlush - .bracketCase(_ => fa) { - case (_, ExitCase.Completed) => cmd.flushCommands *> F.info("Pipeline completed") - case (_, ExitCase.Error(e)) => F.error(s"Pipeline failed: ${e.getMessage}") - case (_, ExitCase.Canceled) => F.error("Pipeline canceled") + ) extends HListRunner[F] { + + def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = + Deferred[F, Either[Throwable, w.R]].flatMap { promise => + F.info("Pipeline started") >> + Resource + .makeCase(cmd.disableAutoFlush >> runner(commands, HNil)) { + case ((fibs: HList), ExitCase.Completed) => + for { + _ <- F.info("Pipeline completed") + _ <- cmd.flushCommands + tr <- joinOrCancel(fibs, HNil)(true) + // Casting here is fine since we have a `Witness` that proves this true + _ <- promise.complete(tr.asInstanceOf[w.R].asRight) + } yield () + case ((fibs: HList), ExitCase.Error(e)) => + F.error(s"Pipeline failed: ${e.getMessage}") >> cancelFibers(fibs, PipelineError, promise) + case ((fibs: HList), ExitCase.Canceled) => + F.error("Pipeline canceled") >> cancelFibers(fibs, PipelineError, promise) + case _ => + F.error("Kernel panic: the impossible happened!") } - .guarantee(cmd.enableAutoFlush) + .use(_ => F.unit) + .guarantee(cmd.enableAutoFlush) >> promise.get.rethrow.timeout(3.seconds) + } + } } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala index c2ce157b..52421837 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala @@ -259,13 +259,13 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V]( /******************************* AutoFlush API **********************************/ override def enableAutoFlush: F[Unit] = - async.flatMap(c => F.delay(c.setAutoFlushCommands(true))) + blocker.blockOn(async.flatMap(c => blocker.delay(c.setAutoFlushCommands(true)))) override def disableAutoFlush: F[Unit] = - async.flatMap(c => F.delay(c.setAutoFlushCommands(false))) + blocker.blockOn(async.flatMap(c => blocker.delay(c.setAutoFlushCommands(false)))) override def flushCommands: F[Unit] = - async.flatMap(c => F.delay(c.flushCommands())) + blocker.blockOn(async.flatMap(c => blocker.delay(c.flushCommands()))) /******************************* Strings API **********************************/ override def append(key: K, value: V): F[Unit] = diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala new file mode 100644 index 00000000..3d9ebec6 --- /dev/null +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2018-2020 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats + +import cats.effect._ +import cats.effect.concurrent.Deferred +import cats.effect.implicits._ +import cats.implicits._ +import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.hlist._ + +class HListRunner[F[_]: Concurrent: Log] { + + // Forks every command in order + def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = + ys match { + case HNil => F.pure(res) + case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res)) + } + + // Joins or cancel fibers correspondent to previous executed commands + def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = + ys match { + case HNil => F.pure(res) + case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin => + h.join.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) + case HCons((h: Fiber[F, Any] @unchecked), t) => + h.cancel.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) + case HCons(h, t) => + F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin) + } + + def cancelFibers[A](fibs: HList, err: Throwable, promise: Deferred[F, Either[Throwable, A]]): F[Unit] = + joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) + +} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala index 5b701079..0343b975 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala @@ -33,7 +33,7 @@ object transactions { case class RedisTransaction[F[_]: Concurrent: Log: Timer, K, V]( cmd: RedisCommands[F, K, V] - ) { + ) extends HListRunner[F] { /*** * Exclusively run Redis commands as part of a transaction. @@ -47,50 +47,27 @@ object transactions { */ def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = Deferred[F, Either[Throwable, w.R]].flatMap { promise => - // Forks every command in order - def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] = - ys match { - case HNil => F.pure(res) - case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res)) - } - - // Joins or cancel fibers correspondent to previous executed commands - def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] = - ys match { - case HNil => F.pure(res) - case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin => - h.join.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) - case HCons((h: Fiber[F, Any] @unchecked), t) => - h.cancel.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) - case HCons(h, t) => - F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin) - } - - def cancelFibers(fibs: HList, err: Throwable = TransactionAborted): F[Unit] = - joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft) - - val tx = - Resource.makeCase(cmd.multi >> runner(commands, HNil)) { - case ((fibs: HList), ExitCase.Completed) => - for { - _ <- F.info("Transaction completed") - _ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e) >> F.raiseError(e)) - tr <- joinOrCancel(fibs, HNil)(true) - // Casting here is fine since we have a `Witness` that proves this true - _ <- promise.complete(tr.asInstanceOf[w.R].asRight) - } yield () - case ((fibs: HList), ExitCase.Error(e)) => - F.error(s"Transaction failed: ${e.getMessage}") >> - cmd.discard.guarantee(cancelFibers(fibs)) - case ((fibs: HList), ExitCase.Canceled) => - F.error("Transaction canceled") >> - cmd.discard.guarantee(cancelFibers(fibs)) - case _ => - F.error("Kernel panic: the impossible happened!") - } - F.info("Transaction started") >> - (tx.use(_ => F.unit) >> promise.get.rethrow).timeout(3.seconds) + Resource + .makeCase(cmd.multi >> runner(commands, HNil)) { + case ((fibs: HList), ExitCase.Completed) => + for { + _ <- F.info("Transaction completed") + _ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e, promise) >> F.raiseError(e)) + tr <- joinOrCancel(fibs, HNil)(true) + // Casting here is fine since we have a `Witness` that proves this true + _ <- promise.complete(tr.asInstanceOf[w.R].asRight) + } yield () + case ((fibs: HList), ExitCase.Error(e)) => + F.error(s"Transaction failed: ${e.getMessage}") >> + cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise)) + case ((fibs: HList), ExitCase.Canceled) => + F.error("Transaction canceled") >> + cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise)) + case _ => + F.error("Kernel panic: the impossible happened!") + } + .use(_ => F.unit) >> promise.get.rethrow.timeout(3.seconds) } } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala index b570bcf0..47622886 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala @@ -20,18 +20,20 @@ import cats.effect._ import cats.implicits._ import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.hlist._ import dev.profunktor.redis4cats.pipeline._ -import scala.concurrent.duration._ +import java.util.concurrent.TimeoutException object RedisPipelineDemo extends LoggerIOApp { import Demo._ def program(implicit log: Log[IO]): IO[Unit] = { - val key = "testp" + val key1 = "testp1" + val key2 = "testp2" - val showResult: Int => Option[String] => IO[Unit] = n => - _.fold(putStrLn(s"Not found key $key-$n"))(s => putStrLn(s)) + val showResult: String => Option[String] => IO[Unit] = key => + _.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s")) val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = for { @@ -42,16 +44,29 @@ object RedisPipelineDemo extends LoggerIOApp { commandsApi .use { cmd => - def traversal(f: Int => IO[Unit]): IO[Unit] = - List.range(0, 50).traverse(f).void + val getters = + cmd.get(key1).flatTap(showResult(key1)) *> + cmd.get(key2).flatTap(showResult(key2)) - val setters: IO[Unit] = - traversal(n => cmd.set(s"$key-$n", (n * 2).toString).start.void) + val operations = + cmd.set(key1, "noop") :: cmd.set(key2, "windows") :: cmd.get(key1) :: + cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil - val getters: IO[Unit] = - traversal(n => cmd.get(s"$key-$n").flatMap(showResult(n))) + val prog = + RedisPipeline(cmd) + .exec(operations) + .flatMap { + case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil => + putStrLn(s"res1: $res1, res2: $res2") + } + .onError { + case PipelineError => + putStrLn("[Error] - Pipeline failed") + case _: TimeoutException => + putStrLn("[Error] - Timeout") + } - RedisPipeline(cmd).run(setters) *> IO.sleep(2.seconds) *> getters + getters >> prog >> getters >> putStrLn("keep doing stuff...") } } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala index 4020bbe7..9c5f131b 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala @@ -44,8 +44,6 @@ object RedisTransactionsDemo extends LoggerIOApp { commandsApi .use { cmd => - val tx = RedisTransaction(cmd) - val getters = cmd.get(key1).flatTap(showResult(key1)) *> cmd.get(key2).flatTap(showResult(key2)) @@ -59,7 +57,8 @@ object RedisTransactionsDemo extends LoggerIOApp { //type Res = Unit :: Unit :: Option[String] :: Unit :: Unit :: Option[String] :: HNil val prog = - tx.exec(operations) + RedisTransaction(cmd) + .exec(operations) .flatMap { case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil => putStrLn(s"res1: $res1, res2: $res2") diff --git a/site/docs/pipelining.md b/site/docs/pipelining.md index ffee5e88..3c56105f 100644 --- a/site/docs/pipelining.md +++ b/site/docs/pipelining.md @@ -11,15 +11,13 @@ Use [pipelining](https://redis.io/topics/pipelining) to speed up your queries by `redis4cats` provides a `RedisPipeline` utility that models this behavior with some guarantees described below: -- `acquire`: disable autoflush. -- `use`: send a bunch of commands defined as `F[A]`. +- `acquire`: disable autoflush and send a bunch of commands defined as a custom `HList`. - `release`: either flush commands on success or log error on failure / cancellation. - `guarantee`: re-enable autoflush. ### RedisPipeline usage -The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern -is so common it is recommended to just use `RedisPipeline`. You can create a pipeline by passing the commands API as a parameter and invoke the `run` function given the set of commands you wish to send to the server. +The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern is so common it is recommended to just use `RedisPipeline`. You can create a pipeline by passing the commands API as a parameter and invoke the `exec` function given the set of commands you wish to send to the server. Note that every command has to be forked (`.start`) because the commands need to be sent to the server in an asynchronous way but no response will be received until the commands are successfully flushed. Also, it is not possible to sequence commands (`flatMap`) that are part of a pipeline. Every command has to be atomic and independent of previous results. @@ -43,7 +41,9 @@ val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = { import cats.effect.IO import cats.implicits._ import dev.profunktor.redis4cats._ +import dev.profunktor.redis4cats.hlist._ import dev.profunktor.redis4cats.pipeline._ +import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import scala.concurrent.ExecutionContext @@ -51,22 +51,36 @@ implicit val timer = IO.timer(ExecutionContext.global) def putStrLn(str: String): IO[Unit] = IO(println(str)) -val key = "testp" +val key1 = "testp1" +val key2 = "testp2" -val showResult: Int => Option[String] => IO[Unit] = n => - _.fold(putStrLn(s"Not found key $key-$n"))(s => putStrLn(s)) +val showResult: String => Option[String] => IO[Unit] = key => +_.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s")) commandsApi.use { cmd => // RedisCommands[IO, String, String] - def traversal(f: Int => IO[Unit]): IO[Unit] = - List.range(0, 50).traverse(f).void - - val setters: IO[Unit] = - traversal(n => cmd.set(s"$key-$n", (n * 2).toString).start.void) - - val getters: IO[Unit] = - traversal(n => cmd.get(s"$key-$n").flatMap(showResult(n))) - - RedisPipeline(cmd).run(setters) *> IO.sleep(2.seconds) *> getters + val getters = + cmd.get(key1).flatTap(showResult(key1)) *> + cmd.get(key2).flatTap(showResult(key2)) + + val operations = + cmd.set(key1, "noop") :: cmd.set(key2, "windows") :: cmd.get(key1) :: + cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil + + val prog = + RedisPipeline(cmd) + .exec(operations) + .flatMap { + case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil => + putStrLn(s"res1: $res1, res2: $res2") + } + .onError { + case PipelineError => + putStrLn("[Error] - Pipeline failed") + case _: TimeoutException => + putStrLn("[Error] - Timeout") + } + + getters >> prog >> getters >> putStrLn("keep doing stuff...") } ```