Skip to content

Commit

Permalink
Merge pull request #444 from profunktor/transactions/replacing-intern…
Browse files Browse the repository at this point in the history
…al-delay-with-deferred

Transactions: replacing internal delay with synchronizer
  • Loading branch information
gvolpe authored Dec 25, 2020
2 parents 0f8b8dc + bf14a14 commit ec5511f
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ object hlist {
}
go(this, HNil)
}

def size: Int = {
def go(ys: HList, acc: Int): Int =
ys match {
case HNil => acc
case HCons(_, t) => go(t, acc + 1)
}
go(this, 0)
}
}

final case class HCons[+H, +Tail <: HList](head: H, tail: Tail) extends HList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ class HListSpec extends FunSuite {
test("Conversion from standard list") {
val lt = List("a", "b", "c")
val hl = "a" :: "b" :: "c" :: HNil
assert(hl == HList.fromList(lt))
assertEquals[HList, HList](hl, HList.fromList(lt))
assertEquals(hl.size, lt.size)

val el = List.empty[Int]
assert(HNil == HList.fromList(el))
assertEquals[HList, HList](HNil, HList.fromList(el))
assertEquals(HNil.size, el.size)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@

package dev.profunktor.redis4cats

import scala.util.control.NoStackTrace

import cats.Parallel
import cats.effect._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.hlist._
import scala.util.control.NoStackTrace

object pipeline {

case object PipelineError extends NoStackTrace

case class RedisPipeline[F[_]: Concurrent: Log: Timer, K, V](
case class RedisPipeline[F[_]: Concurrent: Log: Parallel: Timer, K, V](
cmd: RedisCommands[F, K, V]
) {

Expand Down
104 changes: 44 additions & 60 deletions modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

package dev.profunktor.redis4cats

import java.util.UUID

import scala.concurrent.duration._

import cats.Parallel
import cats.effect._
import cats.effect.concurrent.Deferred
import cats.effect.concurrent.{ Deferred, Ref }
import cats.effect.implicits._
import cats.syntax.all._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.hlist._
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

object Runner {
type CancelFibers[F[_]] = Throwable => F[Unit]
Expand All @@ -38,79 +40,61 @@ object Runner {
mkError: () => Throwable
)

def apply[F[_]: Concurrent: Log: Timer]: RunnerPartiallyApplied[F] =
def apply[F[_]: Concurrent: Log: Parallel: Timer]: RunnerPartiallyApplied[F] =
new RunnerPartiallyApplied[F]
}

private[redis4cats] class RunnerPartiallyApplied[F[_]: Concurrent: Log: Timer] {
private[redis4cats] class RunnerPartiallyApplied[F[_]: Concurrent: Log: Parallel: Timer] {

def filterExec[T <: HList, R <: HList, S <: HList](ops: Runner.Ops[F])(commands: T)(
implicit w: Witness.Aux[T, R],
f: Filter.Aux[R, S]
): F[S] = exec[T, R](ops)(commands).map(_.filterUnit)

/**
* This is unfortunately the easiest way to get optimistic locking to work in a deterministic way. Details follows.
*
* Every transactional command is forked, yielding a Fiber that is part of an HList. Modeling the transaction as
* a `Resource`, we spawn all the fibers representing the commands and simulate a bit of delay to let the underlying
* `ExecutionContext` schedule them before be proceed with the finalizer of the transaction, which always means
* calling EXEC in a successful transaction.
*
* Without this tiny delay, we sometimes get into a race condition where not all the fibers have been scheduled
* (meaning they haven't yet reached Redis), and where EXEC reaches the server before all the commands, making
* the transaction result successful but not always deterministic.
*
* The `OptimisticLockSuite` tests the determinism of this implementation.
*
* A proper way to implement this might be to manage our own `ExecutionContext` so we could tell exactly when all the
* fibers have been scheduled, and only then trigger the EXEC command. This would change the API, though, and it is
* not as easy as it sounds but we can try and experiment with this in the future, if the time allows it.
*/
private def getTxDelay: F[FiniteDuration] =
F.delay {
Duration(sys.env.getOrElse("REDIS4CATS_TX_DELAY", "50.millis")) match {
case fd: FiniteDuration => fd
case x => FiniteDuration(x.toMillis, TimeUnit.MILLISECONDS)
}
}

def exec[T <: HList, R <: HList](ops: Runner.Ops[F])(commands: T)(implicit w: Witness.Aux[T, R]): F[R] =
(Deferred[F, Either[Throwable, w.R]], F.delay(UUID.randomUUID), getTxDelay).tupled.flatMap {
case (promise, uuid, txDelay) =>
def cancelFibers[A](fibs: HList)(after: F[Unit])(err: Throwable): F[Unit] =
joinOrCancel(fibs, HNil)(false).void.guarantee(after) >> promise.complete(err.asLeft)
(Deferred[F, Either[Throwable, w.R]], F.delay(UUID.randomUUID)).tupled.flatMap {
case (promise, uuid) =>
def cancelFibers[A](fibs: HList)(err: Throwable): F[Unit] =
joinOrCancel(fibs, HNil)(false) >> promise.complete(err.asLeft)

def onErrorOrCancelation(fibs: HList): F[Unit] =
cancelFibers(fibs)(ops.onError)(ops.mkError())

F.debug(s"${ops.name} started - ID: $uuid") >>
Resource
.makeCase(ops.mainCmd >> runner(commands, HNil)) {
case (fibs, ExitCase.Completed) =>
for {
_ <- F.debug(s"${ops.name} completed - ID: $uuid")
_ <- ops.onComplete(cancelFibers(fibs)(F.unit))
tr <- joinOrCancel(fibs, HNil)(true)
// Casting here is fine since we have a `Witness` that proves this true
_ <- promise.complete(tr.asInstanceOf[w.R].asRight)
} yield ()
case (fibs, ExitCase.Error(e)) =>
F.error(s"${ops.name} failed: ${e.getMessage} - ID: $uuid") >>
onErrorOrCancelation(fibs)
case (fibs, ExitCase.Canceled) =>
F.error(s"${ops.name} canceled - ID: $uuid") >>
onErrorOrCancelation(fibs)
}
.use(_ => F.sleep(txDelay).void)
.guarantee(ops.afterCompletion) >> promise.get.rethrow.timeout(3.seconds)
cancelFibers(fibs)(ops.mkError()).guarantee(ops.onError)

(Deferred[F, Unit], Ref.of[F, Int](0)).tupled
.flatMap {
case (gate, counter) =>
// wait for commands to be scheduled
val synchronizer: F[Unit] =
counter.modify {
case n if n === (commands.size - 1) => n + 1 -> gate.complete(())
case n => n + 1 -> F.unit
}.flatten

F.debug(s"${ops.name} started - ID: $uuid") >>
(ops.mainCmd >> runner(synchronizer, commands, HNil))
.bracketCase(_ => gate.get) {
case (fibs, ExitCase.Completed) =>
for {
_ <- F.debug(s"${ops.name} completed - ID: $uuid")
_ <- ops.onComplete(cancelFibers(fibs))
r <- joinOrCancel(fibs, HNil)(true)
// Casting here is fine since we have a `Witness` that proves this true
_ <- promise.complete(r.asInstanceOf[w.R].asRight)
} yield ()
case (fibs, ExitCase.Error(e)) =>
F.error(s"${ops.name} failed: ${e.getMessage} - ID: $uuid") >> onErrorOrCancelation(fibs)
case (fibs, ExitCase.Canceled) =>
F.error(s"${ops.name} canceled - ID: $uuid") >> onErrorOrCancelation(fibs)
}
.guarantee(ops.afterCompletion) >> promise.get.rethrow.timeout(3.seconds)
}
}

// Forks every command in order
private def runner[H <: HList, G <: HList](ys: H, res: G): F[HList] =
private def runner[H <: HList, G <: HList](f: F[Unit], ys: H, res: G): F[HList] =
ys match {
case HNil => F.pure(res)
case HCons((h: F[_] @unchecked), t) => h.start.flatMap(fb => runner(t, fb :: res))
case HCons((h: F[_] @unchecked), t) => (h, f).parTupled.map(_._1).start.flatMap(fb => runner(f, t, fb :: res))
}

// Joins or cancel fibers correspondent to previous executed commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@

package dev.profunktor.redis4cats

import scala.util.control.NoStackTrace

import cats.Parallel
import cats.effect._
import cats.syntax.all._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.hlist._
import scala.util.control.NoStackTrace

object transactions {

sealed trait TransactionError extends NoStackTrace
case object TransactionAborted extends TransactionError
case object TransactionDiscarded extends TransactionError

case class RedisTransaction[F[_]: Concurrent: Log: Timer, K, V](
case class RedisTransaction[F[_]: Concurrent: Log: Parallel: Timer, K, V](
cmd: RedisCommands[F, K, V]
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ trait TestScenarios { self: FunSuite =>
val key2 = "key2"
for {
x <- cmd.get(key1)
_ <- IO(assert(x.isEmpty))
_ <- IO(assertEquals(x, None))
exist1 <- cmd.exists(key1)
_ <- IO(assert(!exist1))
idletime1 <- cmd.objectIdletime(key1)
Expand Down Expand Up @@ -408,8 +408,6 @@ trait TestScenarios { self: FunSuite =>
}
}

// With the current implementation (see `Runner#getTxDelay`), we cannot guarantee the commands
// are executed in order with the async API but only that the execution is either successful or failed.
def transactionScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = {
val key1 = "txtest1"
val val1 = "osx"
Expand All @@ -419,24 +417,25 @@ trait TestScenarios { self: FunSuite =>
val val3 = "linux"
val del1 = "deleteme"

val operations = cmd.set(key1, val1) :: cmd.set(key2, val2) :: cmd.set(key3, val3) :: cmd.del(del1) :: HNil
val operations = cmd.set(key2, val2) :: cmd.get(key1) :: cmd.set(key3, val3) :: cmd.del(del1) :: HNil

for {
_ <- cmd.set(del1, "foo")
_ <- RedisTransaction(cmd).exec(operations).void
v1 <- cmd.get(key1)
v2 <- cmd.get(key2)
v3 <- cmd.get(key3)
d1 <- cmd.exists(del1)
} yield {
assertEquals(v1, Some(val1))
assertEquals(v2, Some(val2))
assertEquals(v3, Some(val3))
assert(!d1)
}
cmd.set(del1, "foo") >> cmd.set(key1, val1) >>
RedisTransaction(cmd)
.filterExec(operations)
.map {
case res1 ~: res2 ~: HNil =>
assertEquals(res1, Some(val1))
assertEquals(res2, 1L)
}
.flatMap { _ =>
(cmd.get(key2), cmd.get(key3)).mapN {
case (x, y) =>
assertEquals(x, Some(val2))
assertEquals(y, Some(val3))
}
}
}

// cannot guarantee the order of execution with the async API
def transactionDoubleSetScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = {
val key = "txtest-double-set"

Expand All @@ -456,16 +455,7 @@ trait TestScenarios { self: FunSuite =>
val commands = cmd.set(key1, "v1") :: cmd.set(key2, "v2") :: cmd.set("tx-3", "v3") :: HNil

// We race it with a plain `IO.unit` so the transaction may or may not start at all but the result should be the same
val verifyKey1 =
IO.race(tx.exec(commands).attempt.void, IO.unit) >>
cmd.get(key1).map(assertEquals(_, None)) // no keys written

// We race it with a sleep to make sure the transaction gets time to start running
val verifyKey2 =
IO.race(tx.exec(commands).attempt.void, IO.sleep(20.millis).void) >>
cmd.get(key2).map(assertEquals(_, None)) // no keys written

verifyKey1 >> verifyKey2
IO.race(tx.exec(commands), IO.unit) >> cmd.get(key1).map(assertEquals(_, None)) // no keys written
}

def scriptsScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = {
Expand Down
19 changes: 7 additions & 12 deletions site/docs/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,9 @@ Note that every command has to be forked (`.start`) because the commands need to

These are internals, though. All you need to care about is what commands you want to run as part of a transaction and handle the possible errors and retry logic.

##### Write-only commands

⚠️ Only writing commands such as `set`, `hset` and `del` are supported. Although using `get` as part of a transaction would compile, the result will be non-deterministic due to the asynchronous nature of the implementation. ⚠️

In the future, this might be supported but the only way to get it right is either to use the underlying synchronous API or to use a custom `ExecutionContext` so we can control the order of execution as well as when a command has been effectively dispatched. Arguably, though, the most common cases for transactional commands and optimistic locking involve *only writing* for which the existing API should be good enough.

##### Concurrent transactions

⚠️ if we want to run transactions concurrently, we need to acquire a connection per transaction (`RedisCommands`), as `MULTI` can not be called concurrently within the same connection, reason why it is recommended to share the same `RedisClient`. ⚠️
⚠️ in order to run transactions concurrently, you'd need to acquire a connection per transaction (`RedisCommands`), as `MULTI` can not be called concurrently within the same connection. For such cases, it is recommended to share the same `RedisClient`. ⚠️

### Working with transactions

Expand Down Expand Up @@ -74,23 +68,24 @@ val showResult: String => Option[String] => IO[Unit] = key =>
commandsApi.use { cmd => // RedisCommands[IO, String, String]
val tx = RedisTransaction(cmd)

val setters = cmd.set(key3, "delete_me")
val setters = cmd.set(key2, "delete_me") >> cmd.set(key3, "foo")

val getters =
cmd.get(key1).flatTap(showResult(key1)) >>
cmd.get(key2).flatTap(showResult(key2))

// the commands type is fully inferred
// IO[Unit] :: IO[Option[String]] :: IO[Unit] :: HNil
val commands = cmd.set(key1, "foo") :: cmd.del(key2) :: cmd.set(key3, "bar") :: HNil
val commands = cmd.set(key1, "foo") :: cmd.del(key2) :: cmd.get(key3) :: HNil

// the result type is inferred as well
// Option[String] :: HNil
val prog =
tx.filterExec(commands)
.flatMap {
case res ~: HNil =>
putStrLn(s"Key2 result: $res")
case res1 ~: res2 ~: HNil =>
putStrLn(s"Key2 result: $res1") >>
putStrLn(s"Key3 result: $res2")
}
.onError {
case TransactionAborted =>
Expand Down Expand Up @@ -135,7 +130,7 @@ commandsApi.use { cmd =>
}
```

You should never pass a transactional command: `MULTI`, `EXEC` or `DISCARD`.
You should never pass a transactional command: `MULTI`, `EXEC` or `DISCARD`. These commands are made available in case you want to handle transactions manually, which you should do *at your own risk*.

The following example will result in a successful transaction on Redis. Yet, the operation will end up raising the error passed as a command.

Expand Down

0 comments on commit ec5511f

Please sign in to comment.