Skip to content

Commit

Permalink
Safely disable transaction in a cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
gvolpe committed May 19, 2019
1 parent 1da67e1 commit 87adc43
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import scala.util.control.NoStackTrace

case class OperationNotSupported(value: String) extends NoStackTrace {
override def toString(): String = s"OperationNotSupported($value)"
}

private[redis4cats] trait RedisConnection[F[_], K, V] {
def async: F[RedisAsyncCommands[K, V]]
Expand All @@ -37,17 +42,19 @@ private[redis4cats] class RedisStatefulConnection[F[_]: Concurrent: ContextShift
) extends RedisConnection[F, K, V] {
def async: F[RedisAsyncCommands[K, V]] = Sync[F].delay(conn.async())
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] =
Sync[F].raiseError(new Exception("Operation not supported"))
Sync[F].raiseError(OperationNotSupported("Running in a single node"))
def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
Sync[F].raiseError(new Exception("Operation not supported"))
Sync[F].raiseError(OperationNotSupported("Running in a single node"))
}

private[redis4cats] class RedisStatefulClusterConnection[F[_]: Concurrent: ContextShift, K, V](
conn: StatefulRedisClusterConnection[K, V]
) extends RedisConnection[F, K, V] {
def async: F[RedisAsyncCommands[K, V]] =
Sync[F].raiseError(new Exception("Transactions are not supported on a cluster"))
Sync[F].raiseError(
OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.")
)
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = Sync[F].delay(conn.async())
def close: F[Unit] =
JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,42 @@ private[redis4cats] class BaseRedis[F[_]: ContextShift, K, V](

def multi: F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].multi()))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.multi())
case _ => conn.async.flatMap(c => F.delay(c.multi()))
}
}.void

def exec: F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].exec()))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.exec())
case _ => conn.async.flatMap(c => F.delay(c.exec()))
}
}.void

def discard: F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].discard()))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.discard())
case _ => conn.async.flatMap(c => F.delay(c.discard()))
}
}.void

def watch(keys: K*): F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].watch(keys: _*)))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.watch(keys: _*))
case _ => conn.async.flatMap(c => F.delay(c.watch(keys: _*)))
}
}.void

def unwatch: F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].unwatch()))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.unwatch())
case _ => conn.async.flatMap(c => F.delay(c.unwatch()))
}
}.void

/******************************* Strings API **********************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,29 @@ object RedisClusterTransactionsDemo extends LoggerIOApp {
nodeCmd <- Redis.clusterByNode[IO, String, String](client, stringCodec, nodeId)
} yield nodeCmd

// Transaction runs in a single shard, where "key1" is stored
nodeCmdResource.use { nodeCmd =>
val tx = RedisTransaction(nodeCmd)
// Transactions are only supported on a single node
val notAllowed =
cmd.multi.bracket(_ => cmd.set(key1, "nope") *> cmd.exec)(_ => cmd.discard).handleErrorWith {
case e: OperationNotSupported => putStrLn(e)
}

val getter = cmd.get(key1).flatTap(showResult(key1))
val setter = cmd.set(key1, "foo").start
notAllowed *>
// Transaction runs in a single shard, where "key1" is stored
nodeCmdResource.use { nodeCmd =>
val tx = RedisTransaction(nodeCmd)

val failedSetter =
cmd.set(key1, "qwe").start *>
IO.raiseError(new Exception("boom"))
val getter = cmd.get(key1).flatTap(showResult(key1))
val setter = cmd.set(key1, "foo").start

val tx1 = tx.run(setter)
val tx2 = tx.run(failedSetter)
val failedSetter =
cmd.set(key1, "qwe").start *>
IO.raiseError(new Exception("boom"))

getter *> tx1 *> tx2.attempt *> getter.void
}
val tx1 = tx.run(setter)
val tx2 = tx.run(failedSetter)

getter *> tx1 *> tx2.attempt *> getter.void
}
}

}
Expand Down

0 comments on commit 87adc43

Please sign in to comment.