Skip to content

Commit

Permalink
Merge pull request #433 from profunktor/fix/pipelining-tests-and-docs
Browse files Browse the repository at this point in the history
Fix pipelining test + add caveats documentation
  • Loading branch information
gvolpe authored Dec 6, 2020
2 parents 833a967 + fd04236 commit d3b8bcd
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RedisSpec extends Redis4CatsFunSuite(false) with TestScenarios {

test("connection api")(withRedis(connectionScenario))

test("pipelining".flaky)(withRedis(pipelineScenario))
test("pipelining")(withRedis(pipelineScenario))

test("server")(withRedis(serverScenario))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package dev.profunktor.redis4cats

import java.time.Instant
import java.util.concurrent.TimeoutException

import cats.data.NonEmptyList
import cats.effect._
Expand All @@ -25,7 +26,7 @@ import dev.profunktor.redis4cats.data.KeyScanCursor
import dev.profunktor.redis4cats.effect.Log.NoOp._
import dev.profunktor.redis4cats.effects._
import dev.profunktor.redis4cats.hlist._
import dev.profunktor.redis4cats.pipeline.RedisPipeline
import dev.profunktor.redis4cats.pipeline.{ PipelineError, RedisPipeline }
import dev.profunktor.redis4cats.transactions.RedisTransaction
import io.lettuce.core.GeoArgs
import munit.FunSuite
Expand Down Expand Up @@ -378,20 +379,33 @@ trait TestScenarios { self: FunSuite =>
def pipelineScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = {
val key1 = "testp1"
val key2 = "testp2"
val key3 = "testp3"

val operations =
cmd.set(key1, "osx") :: cmd.set(key2, "windows") :: cmd.get(key1) :: cmd.sIsMember("foo", "bar") ::
cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil

RedisPipeline(cmd).exec(operations).map {
case _ ~: _ ~: res1 ~: res2 ~: _ ~: _ ~: res3 ~: HNil =>
assert(res1.contains("osx"))
assert(!res2)
assert(res3.contains("nix"))
case tr =>
fail(s"Unexpected result: $tr")
}
cmd.set(key1, "osx") :: cmd.get(key3) :: cmd.set(key2, "linux") :: cmd.sIsMember("foo", "bar") :: HNil

val runPipeline =
RedisPipeline(cmd)
.filterExec(operations)
.map {
case res1 ~: res2 ~: HNil =>
assertEquals(res1, Some("3"))
assert(!res2)
}
.onError {
case PipelineError => fail("[Error] - Pipeline failed")
case _: TimeoutException => fail("[Error] - Timeout")
}

for {
_ <- cmd.set(key3, "3")
_ <- runPipeline
v1 <- cmd.get(key1)
v2 <- cmd.get(key2)
} yield {
assertEquals(v1, Some("osx"))
assertEquals(v2, Some("linux"))
}
}

// With the current implementation (see `Runner#getTxDelay`), we cannot guarantee the commands
Expand Down
63 changes: 45 additions & 18 deletions site/docs/pipelining.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ Use [pipelining](https://redis.io/topics/pipelining) to speed up your queries by
- `release`: either flush commands on success or log error on failure / cancellation.
- `guarantee`: re-enable autoflush.

## Caveats

⚠️ **Pipelining shares the same asynchronous implementation of transactions, meaning the order of the commands cannot be guaranteed.** ⚠️

This statement means that given the following set of operations.

```scala
val operations =
cmd.set(key1, "osx") :: cmd.set(key2, "linux") :: cmd.get(key1) ::
cmd.set(key1, "bar") :: cmd.set(key2, "foo") :: cmd.get(key1) :: HNil
```

The result of those two `get` operations will not be deterministic.

### RedisPipeline usage

The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern is so common it is recommended to just use `RedisPipeline`. You can create a pipeline by passing the commands API as a parameter and invoke the `exec` function (or `filterExec`) given the set of commands you wish to send to the server.
Expand Down Expand Up @@ -53,32 +67,45 @@ def putStrLn(str: String): IO[Unit] = IO(println(str))

val key1 = "testp1"
val key2 = "testp2"
val key3 = "testp3"

val showResult: String => Option[String] => IO[Unit] = key =>
_.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s"))

commandsApi.use { cmd => // RedisCommands[IO, String, String]
val getters =
cmd.get(key1).flatTap(showResult(key1)) *>
cmd.get(key2).flatTap(showResult(key2))

val operations =
cmd.set(key1, "noop") :: cmd.set(key2, "windows") :: cmd.get(key1) ::
cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil
cmd.get(key1).flatTap(showResult(key1)) >>
cmd.get(key2).flatTap(showResult(key2)) >>
cmd.get(key3).flatTap(showResult(key3))

val operations =
cmd.set(key1, "osx") :: cmd.get(key3) :: cmd.set(key2, "linux") :: cmd.sIsMember("foo", "bar") :: HNil

val runPipeline =
RedisPipeline(cmd)
.filterExec(operations)
.map {
case res1 ~: res2 ~: HNil =>
assert(res1.contains("3"))
assert(!res2)
}
.onError {
case PipelineError =>
putStrLn("[Error] - Pipeline failed")
case _: TimeoutException =>
putStrLn("[Error] - Timeout")
}

val prog =
RedisPipeline(cmd)
.filterExec(operations)
.flatMap {
case res1 ~: res2 ~: HNil =>
putStrLn(s"res1: $res1, res2: $res2")
}
.onError {
case PipelineError =>
putStrLn("[Error] - Pipeline failed")
case _: TimeoutException =>
putStrLn("[Error] - Timeout")
}
for {
_ <- cmd.set(key3, "3")
_ <- runPipeline
v1 <- cmd.get(key1)
v2 <- cmd.get(key2)
} yield {
assert(v1.contains("osx"))
assert(v2.contains("linux"))
}

getters >> prog >> getters >> putStrLn("keep doing stuff...")
}
Expand Down

0 comments on commit d3b8bcd

Please sign in to comment.