Skip to content

Commit

Permalink
Redis pipelining as in transactions;
Browse files Browse the repository at this point in the history
  • Loading branch information
gvolpe committed May 8, 2020
1 parent 466437b commit 5b9629b
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,46 @@
package dev.profunktor.redis4cats

import cats.effect._
import cats.effect.concurrent.Deferred
import cats.effect.implicits._
import cats.implicits._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.hlist._
import scala.concurrent.duration._
import scala.util.control.NoStackTrace

object pipeline {

case class RedisPipeline[F[_]: Bracket[*[_], Throwable]: Log, K, V](
case object PipelineError extends NoStackTrace

case class RedisPipeline[F[_]: Concurrent: Log: Timer, K, V](
cmd: RedisCommands[F, K, V]
) {
def run[A](fa: F[A]): F[A] =
F.info("Pipeline started") *>
cmd.disableAutoFlush
.bracketCase(_ => fa) {
case (_, ExitCase.Completed) => cmd.flushCommands *> F.info("Pipeline completed")
case (_, ExitCase.Error(e)) => F.error(s"Pipeline failed: ${e.getMessage}")
case (_, ExitCase.Canceled) => F.error("Pipeline canceled")
) extends HListRunner[F] {

def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] =
Deferred[F, Either[Throwable, w.R]].flatMap { promise =>
F.info("Pipeline started") >>
Resource
.makeCase(cmd.disableAutoFlush >> runner(commands, HNil)) {
case ((fibs: HList), ExitCase.Completed) =>
for {
_ <- F.info("Pipeline completed")
_ <- cmd.flushCommands
tr <- joinOrCancel(fibs, HNil)(true)
// Casting here is fine since we have a `Witness` that proves this true
_ <- promise.complete(tr.asInstanceOf[w.R].asRight)
} yield ()
case ((fibs: HList), ExitCase.Error(e)) =>
F.error(s"Pipeline failed: ${e.getMessage}") >> cancelFibers(fibs, PipelineError, promise)
case ((fibs: HList), ExitCase.Canceled) =>
F.error("Pipeline canceled") >> cancelFibers(fibs, PipelineError, promise)
case _ =>
F.error("Kernel panic: the impossible happened!")
}
.guarantee(cmd.enableAutoFlush)
.use(_ => F.unit)
.guarantee(cmd.enableAutoFlush) >> promise.get.rethrow.timeout(3.seconds)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ private[redis4cats] class BaseRedis[F[_]: Concurrent: ContextShift, K, V](

/******************************* AutoFlush API **********************************/
override def enableAutoFlush: F[Unit] =
async.flatMap(c => F.delay(c.setAutoFlushCommands(true)))
blocker.blockOn(async.flatMap(c => blocker.delay(c.setAutoFlushCommands(true))))

override def disableAutoFlush: F[Unit] =
async.flatMap(c => F.delay(c.setAutoFlushCommands(false)))
blocker.blockOn(async.flatMap(c => blocker.delay(c.setAutoFlushCommands(false))))

override def flushCommands: F[Unit] =
async.flatMap(c => F.delay(c.flushCommands()))
blocker.blockOn(async.flatMap(c => blocker.delay(c.flushCommands())))

/******************************* Strings API **********************************/
override def append(key: K, value: V): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2018-2020 ProfunKtor
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.profunktor.redis4cats

import cats.effect._
import cats.effect.concurrent.Deferred
import cats.effect.implicits._
import cats.implicits._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.hlist._

class HListRunner[F[_]: Concurrent: Log] {

// Forks every command in order
def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] =
ys match {
case HNil => F.pure(res)
case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res))
}

// Joins or cancel fibers correspondent to previous executed commands
def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] =
ys match {
case HNil => F.pure(res)
case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin =>
h.join.flatMap(x => joinOrCancel(t, x :: res)(isJoin))
case HCons((h: Fiber[F, Any] @unchecked), t) =>
h.cancel.flatMap(x => joinOrCancel(t, x :: res)(isJoin))
case HCons(h, t) =>
F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin)
}

def cancelFibers[A](fibs: HList, err: Throwable, promise: Deferred[F, Either[Throwable, A]]): F[Unit] =
joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft)

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object transactions {

case class RedisTransaction[F[_]: Concurrent: Log: Timer, K, V](
cmd: RedisCommands[F, K, V]
) {
) extends HListRunner[F] {

/***
* Exclusively run Redis commands as part of a transaction.
Expand All @@ -47,50 +47,27 @@ object transactions {
*/
def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] =
Deferred[F, Either[Throwable, w.R]].flatMap { promise =>
// Forks every command in order
def runner[H <: HList, G <: HList](ys: H, res: G): F[Any] =
ys match {
case HNil => F.pure(res)
case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res))
}

// Joins or cancel fibers correspondent to previous executed commands
def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[Any] =
ys match {
case HNil => F.pure(res)
case HCons((h: Fiber[F, Any] @unchecked), t) if isJoin =>
h.join.flatMap(x => joinOrCancel(t, x :: res)(isJoin))
case HCons((h: Fiber[F, Any] @unchecked), t) =>
h.cancel.flatMap(x => joinOrCancel(t, x :: res)(isJoin))
case HCons(h, t) =>
F.error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin)
}

def cancelFibers(fibs: HList, err: Throwable = TransactionAborted): F[Unit] =
joinOrCancel(fibs, HNil)(false).void >> promise.complete(err.asLeft)

val tx =
Resource.makeCase(cmd.multi >> runner(commands, HNil)) {
case ((fibs: HList), ExitCase.Completed) =>
for {
_ <- F.info("Transaction completed")
_ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e) >> F.raiseError(e))
tr <- joinOrCancel(fibs, HNil)(true)
// Casting here is fine since we have a `Witness` that proves this true
_ <- promise.complete(tr.asInstanceOf[w.R].asRight)
} yield ()
case ((fibs: HList), ExitCase.Error(e)) =>
F.error(s"Transaction failed: ${e.getMessage}") >>
cmd.discard.guarantee(cancelFibers(fibs))
case ((fibs: HList), ExitCase.Canceled) =>
F.error("Transaction canceled") >>
cmd.discard.guarantee(cancelFibers(fibs))
case _ =>
F.error("Kernel panic: the impossible happened!")
}

F.info("Transaction started") >>
(tx.use(_ => F.unit) >> promise.get.rethrow).timeout(3.seconds)
Resource
.makeCase(cmd.multi >> runner(commands, HNil)) {
case ((fibs: HList), ExitCase.Completed) =>
for {
_ <- F.info("Transaction completed")
_ <- cmd.exec.handleErrorWith(e => cancelFibers(fibs, e, promise) >> F.raiseError(e))
tr <- joinOrCancel(fibs, HNil)(true)
// Casting here is fine since we have a `Witness` that proves this true
_ <- promise.complete(tr.asInstanceOf[w.R].asRight)
} yield ()
case ((fibs: HList), ExitCase.Error(e)) =>
F.error(s"Transaction failed: ${e.getMessage}") >>
cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise))
case ((fibs: HList), ExitCase.Canceled) =>
F.error("Transaction canceled") >>
cmd.discard.guarantee(cancelFibers(fibs, TransactionAborted, promise))
case _ =>
F.error("Kernel panic: the impossible happened!")
}
.use(_ => F.unit) >> promise.get.rethrow.timeout(3.seconds)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ import cats.effect._
import cats.implicits._
import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.hlist._
import dev.profunktor.redis4cats.pipeline._
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException

object RedisPipelineDemo extends LoggerIOApp {

import Demo._

def program(implicit log: Log[IO]): IO[Unit] = {
val key = "testp"
val key1 = "testp1"
val key2 = "testp2"

val showResult: Int => Option[String] => IO[Unit] = n =>
_.fold(putStrLn(s"Not found key $key-$n"))(s => putStrLn(s))
val showResult: String => Option[String] => IO[Unit] = key =>
_.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s"))

val commandsApi: Resource[IO, RedisCommands[IO, String, String]] =
for {
Expand All @@ -42,16 +44,29 @@ object RedisPipelineDemo extends LoggerIOApp {

commandsApi
.use { cmd =>
def traversal(f: Int => IO[Unit]): IO[Unit] =
List.range(0, 50).traverse(f).void
val getters =
cmd.get(key1).flatTap(showResult(key1)) *>
cmd.get(key2).flatTap(showResult(key2))

val setters: IO[Unit] =
traversal(n => cmd.set(s"$key-$n", (n * 2).toString).start.void)
val operations =
cmd.set(key1, "noop") :: cmd.set(key2, "windows") :: cmd.get(key1) ::
cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil

val getters: IO[Unit] =
traversal(n => cmd.get(s"$key-$n").flatMap(showResult(n)))
val prog =
RedisPipeline(cmd)
.exec(operations)
.flatMap {
case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil =>
putStrLn(s"res1: $res1, res2: $res2")
}
.onError {
case PipelineError =>
putStrLn("[Error] - Pipeline failed")
case _: TimeoutException =>
putStrLn("[Error] - Timeout")
}

RedisPipeline(cmd).run(setters) *> IO.sleep(2.seconds) *> getters
getters >> prog >> getters >> putStrLn("keep doing stuff...")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ object RedisTransactionsDemo extends LoggerIOApp {

commandsApi
.use { cmd =>
val tx = RedisTransaction(cmd)

val getters =
cmd.get(key1).flatTap(showResult(key1)) *>
cmd.get(key2).flatTap(showResult(key2))
Expand All @@ -59,7 +57,8 @@ object RedisTransactionsDemo extends LoggerIOApp {

//type Res = Unit :: Unit :: Option[String] :: Unit :: Unit :: Option[String] :: HNil
val prog =
tx.exec(operations)
RedisTransaction(cmd)
.exec(operations)
.flatMap {
case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil =>
putStrLn(s"res1: $res1, res2: $res2")
Expand Down
48 changes: 31 additions & 17 deletions site/docs/pipelining.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ Use [pipelining](https://redis.io/topics/pipelining) to speed up your queries by

`redis4cats` provides a `RedisPipeline` utility that models this behavior with some guarantees described below:

- `acquire`: disable autoflush.
- `use`: send a bunch of commands defined as `F[A]`.
- `acquire`: disable autoflush and send a bunch of commands defined as a custom `HList`.
- `release`: either flush commands on success or log error on failure / cancellation.
- `guarantee`: re-enable autoflush.

### RedisPipeline usage

The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern
is so common it is recommended to just use `RedisPipeline`. You can create a pipeline by passing the commands API as a parameter and invoke the `run` function given the set of commands you wish to send to the server.
The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern is so common it is recommended to just use `RedisPipeline`. You can create a pipeline by passing the commands API as a parameter and invoke the `exec` function given the set of commands you wish to send to the server.

Note that every command has to be forked (`.start`) because the commands need to be sent to the server in an asynchronous way but no response will be received until the commands are successfully flushed. Also, it is not possible to sequence commands (`flatMap`) that are part of a pipeline. Every command has to be atomic and independent of previous results.

Expand All @@ -43,30 +41,46 @@ val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = {
import cats.effect.IO
import cats.implicits._
import dev.profunktor.redis4cats._
import dev.profunktor.redis4cats.hlist._
import dev.profunktor.redis4cats.pipeline._
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

implicit val timer = IO.timer(ExecutionContext.global)

def putStrLn(str: String): IO[Unit] = IO(println(str))

val key = "testp"
val key1 = "testp1"
val key2 = "testp2"

val showResult: Int => Option[String] => IO[Unit] = n =>
_.fold(putStrLn(s"Not found key $key-$n"))(s => putStrLn(s))
val showResult: String => Option[String] => IO[Unit] = key =>
_.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s"))

commandsApi.use { cmd => // RedisCommands[IO, String, String]
def traversal(f: Int => IO[Unit]): IO[Unit] =
List.range(0, 50).traverse(f).void

val setters: IO[Unit] =
traversal(n => cmd.set(s"$key-$n", (n * 2).toString).start.void)

val getters: IO[Unit] =
traversal(n => cmd.get(s"$key-$n").flatMap(showResult(n)))

RedisPipeline(cmd).run(setters) *> IO.sleep(2.seconds) *> getters
val getters =
cmd.get(key1).flatTap(showResult(key1)) *>
cmd.get(key2).flatTap(showResult(key2))

val operations =
cmd.set(key1, "noop") :: cmd.set(key2, "windows") :: cmd.get(key1) ::
cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil

val prog =
RedisPipeline(cmd)
.exec(operations)
.flatMap {
case _ ~: _ ~: res1 ~: _ ~: _ ~: res2 ~: HNil =>
putStrLn(s"res1: $res1, res2: $res2")
}
.onError {
case PipelineError =>
putStrLn("[Error] - Pipeline failed")
case _: TimeoutException =>
putStrLn("[Error] - Timeout")
}

getters >> prog >> getters >> putStrLn("keep doing stuff...")
}
```

0 comments on commit 5b9629b

Please sign in to comment.