Skip to content

Commit

Permalink
Adding support for single-shard transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
gvolpe committed May 14, 2019
1 parent 5ed302d commit 1da67e1
Show file tree
Hide file tree
Showing 18 changed files with 161 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package dev.profunktor.redis4cats.connection

import cats.effect.{ Concurrent, ContextShift, Resource, Sync }
import cats.implicits._
import dev.profunktor.redis4cats.domain.{ LiveRedisClusterClient, RedisClusterClient }
import dev.profunktor.redis4cats.domain.{ LiveRedisClusterClient, NodeId, RedisClusterClient }
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import io.lettuce.core.{ RedisURI => JRedisURI }
import io.lettuce.core.cluster.{ RedisClusterClient => JClusterClient }
import io.lettuce.core.cluster.{ SlotHash, RedisClusterClient => JClusterClient }
import io.lettuce.core.cluster.models.partitions.{ Partitions => JPartitions }

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -53,4 +54,15 @@ object RedisClusterClient {
Resource.make(acquire)(release)
}

def nodeId[F[_]: Sync](
client: RedisClusterClient,
keyName: String
): F[NodeId] =
Sync[F].delay(SlotHash.getSlot(keyName)).flatMap { slot =>
partitions(client).map(_.getPartitionBySlot(slot).getNodeId).map(NodeId)
}

def partitions[F[_]: Sync](client: RedisClusterClient): F[JPartitions] =
Sync[F].delay(client.underlying.getPartitions())

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dev.profunktor.redis4cats.connection

import cats.effect.{ Concurrent, ContextShift, Sync }
import cats.syntax.all._
import dev.profunktor.redis4cats.domain.NodeId
import dev.profunktor.redis4cats.effect.JRFuture
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.async.RedisAsyncCommands
Expand All @@ -28,22 +29,30 @@ private[redis4cats] trait RedisConnection[F[_], K, V] {
def async: F[RedisAsyncCommands[K, V]]
def clusterAsync: F[RedisClusterAsyncCommands[K, V]]
def close: F[Unit]
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]]
}

private[redis4cats] class RedisStatefulConnection[F[_]: Concurrent: ContextShift, K, V](
conn: StatefulRedisConnection[K, V]
) extends RedisConnection[F, K, V] {
override def async: F[RedisAsyncCommands[K, V]] = Sync[F].delay(conn.async())
override def clusterAsync: F[RedisClusterAsyncCommands[K, V]] =
def async: F[RedisAsyncCommands[K, V]] = Sync[F].delay(conn.async())
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] =
Sync[F].raiseError(new Exception("Operation not supported"))
def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
Sync[F].raiseError(new Exception("Operation not supported"))
override def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
}

private[redis4cats] class RedisStatefulClusterConnection[F[_]: Concurrent: ContextShift, K, V](
conn: StatefulRedisClusterConnection[K, V]
) extends RedisConnection[F, K, V] {
override def async: F[RedisAsyncCommands[K, V]] =
def async: F[RedisAsyncCommands[K, V]] =
Sync[F].raiseError(new Exception("Transactions are not supported on a cluster"))
override def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = Sync[F].delay(conn.async())
override def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = Sync[F].delay(conn.async())
def close: F[Unit] =
JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
JRFuture.fromCompletableFuture(Sync[F].delay(conn.getConnectionAsync(nodeId.value))).flatMap { stateful =>
Sync[F].delay(stateful.async())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ object domain {
}
case class LiveRedisCodec[K, V](underlying: JCodec[K, V]) extends RedisCodec[K, V]

case class NodeId(value: String) extends AnyVal

object RedisCodec {
val Ascii = LiveRedisCodec(StringCodec.ASCII)
val Utf8 = LiveRedisCodec(StringCodec.UTF8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import dev.profunktor.redis4cats.effects._
import io.lettuce.core.{ Limit => JLimit, Range => JRange, RedisURI => JRedisURI }
import io.lettuce.core.{ GeoArgs, GeoRadiusStoreArgs, GeoWithin, ScoredValue, ZAddArgs, ZStoreArgs }
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -63,6 +64,29 @@ object Redis {
(acquire, release)
}

private[redis4cats] def acquireAndReleaseClusterByNode[F[_]: Concurrent: ContextShift: Log, K, V](
client: RedisClusterClient,
codec: RedisCodec[K, V],
nodeId: NodeId
): (F[BaseRedis[F, K, V]], BaseRedis[F, K, V] => F[Unit]) = {
val acquire = JRFuture
.fromCompletableFuture {
Sync[F].delay(client.underlying.connectAsync[K, V](codec.underlying))
}
.map { c =>
new BaseRedis[F, K, V](new RedisStatefulClusterConnection(c), cluster = true) {
override def async: F[RedisClusterAsyncCommands[K, V]] =
if (cluster) conn.byNode(nodeId).widen[RedisClusterAsyncCommands[K, V]]
else conn.async.widen[RedisClusterAsyncCommands[K, V]]
}
}

val release: BaseRedis[F, K, V] => F[Unit] = c =>
Log[F].info(s"Releasing single-shard cluster Commands connection: ${client.underlying}") *> c.conn.close

(acquire, release)
}

def apply[F[_]: Concurrent: ContextShift: Log, K, V](
client: RedisClient,
codec: RedisCodec[K, V],
Expand All @@ -80,6 +104,15 @@ object Redis {
Resource.make(acquire)(release).map(_.asInstanceOf[RedisCommands[F, K, V]])
}

def clusterByNode[F[_]: Concurrent: ContextShift: Log, K, V](
clusterClient: RedisClusterClient,
codec: RedisCodec[K, V],
nodeId: NodeId
): Resource[F, RedisCommands[F, K, V]] = {
val (acquire, release) = acquireAndReleaseClusterByNode(clusterClient, codec, nodeId)
Resource.make(acquire)(release).map(_.asInstanceOf[RedisCommands[F, K, V]])
}

def masterSlave[F[_]: Concurrent: ContextShift: Log, K, V](
conn: RedisMasterSlaveConnection[K, V]
): F[RedisCommands[F, K, V]] =
Expand All @@ -96,45 +129,45 @@ private[redis4cats] class BaseRedis[F[_]: ContextShift, K, V](

import scala.collection.JavaConverters._

private val async: F[RedisClusterAsyncCommands[K, V]] =
def async: F[RedisClusterAsyncCommands[K, V]] =
if (cluster) conn.clusterAsync else conn.async.widen[RedisClusterAsyncCommands[K, V]]

override def del(key: K*): F[Unit] =
def del(key: K*): F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.del(key: _*)))
}.void

override def expire(key: K, expiresIn: FiniteDuration): F[Unit] =
def expire(key: K, expiresIn: FiniteDuration): F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.expire(key, expiresIn.toSeconds)))
}.void

/******************************* Transactions API **********************************/
// When in a cluster, transactions should run against a single node, therefore we use `conn.async` instead of `conn.clusterAsync`.

override def multi: F[Unit] =
def multi: F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.multi()))
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].multi()))
}.void

def exec: F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.exec()))
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].exec()))
}.void

def discard: F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.discard()))
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].discard()))
}.void

def watch(keys: K*): F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.watch(keys: _*)))
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].watch(keys: _*)))
}.void

def unwatch: F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.unwatch()))
async.flatMap(c => F.delay(c.asInstanceOf[RedisAsyncCommands[K, V]].unwatch()))
}.void

/******************************* Strings API **********************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fs2.{ Pipe, Stream }
import scala.concurrent.duration._
import scala.util.Random

object Fs2PubSubDemo extends LoggerIOApp {
object PubSubDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fs2.Stream
import scala.concurrent.duration._
import scala.util.Random

object Fs2PublisherDemo extends LoggerIOApp {
object PublisherDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisClusterStringsDemo extends LoggerIOApp {
object RedisClusterStringsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2018-2019 ProfunKtor
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.profunktor.redis4cats

import cats.effect.{ IO, Resource }
import cats.implicits._
import dev.profunktor.redis4cats.algebra.RedisCommands
import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.domain.RedisClusterClient
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis
import dev.profunktor.redis4cats.transactions._

object RedisClusterTransactionsDemo extends LoggerIOApp {

import Demo._

def program(implicit log: Log[IO]): IO[Unit] = {
val key1 = "test1"

val showResult: String => Option[String] => IO[Unit] = key =>
_.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s))

val commandsApi: Resource[IO, (RedisClusterClient, RedisCommands[IO, String, String])] =
for {
uri <- Resource.liftF(RedisURI.make[IO](redisClusterURI))
client <- RedisClusterClient[IO](uri)
redis <- Redis.cluster[IO, String, String](client, stringCodec)
} yield client -> redis

commandsApi
.use {
case (client, cmd) =>
val nodeCmdResource =
for {
_ <- Resource.liftF(cmd.set(key1, "empty"))
nodeId <- Resource.liftF(RedisClusterClient.nodeId[IO](client, key1))
nodeCmd <- Redis.clusterByNode[IO, String, String](client, stringCodec, nodeId)
} yield nodeCmd

// Transaction runs in a single shard, where "key1" is stored
nodeCmdResource.use { nodeCmd =>
val tx = RedisTransaction(nodeCmd)

val getter = cmd.get(key1).flatTap(showResult(key1))
val setter = cmd.set(key1, "foo").start

val failedSetter =
cmd.set(key1, "qwe").start *>
IO.raiseError(new Exception("boom"))

val tx1 = tx.run(setter)
val tx2 = tx.run(failedSetter)

getter *> tx1 *> tx2.attempt *> getter.void
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import dev.profunktor.redis4cats.effects._
import dev.profunktor.redis4cats.interpreter.Redis
import io.lettuce.core.GeoArgs

object Fs2RedisGeoDemo extends LoggerIOApp {
object RedisGeoDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisHashesDemo extends LoggerIOApp {
object RedisHashesDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisListsDemo extends LoggerIOApp {
object RedisListsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.domain.{ ReadFrom, RedisMasterSlaveConnection }
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisMasterSlaveStringsDemo extends LoggerIOApp {
object RedisMasterSlaveStringsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisSetsDemo extends LoggerIOApp {
object RedisSetsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.effects.{ Score, ScoreWithValue, ZRange }
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisSortedSetsDemo extends LoggerIOApp {
object RedisSortedSetsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisStringsDemo extends LoggerIOApp {
object RedisStringsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package dev.profunktor.redis4cats

import cats.effect.{ IO, Resource }
import cats.instances.list._
import cats.syntax.all._
import cats.implicits._
import dev.profunktor.redis4cats.algebra.RedisCommands
import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import fs2.Stream
import scala.concurrent.duration._
import scala.util.Random

object Fs2StreamingDemo extends LoggerIOApp {
object StreamingDemo extends LoggerIOApp {

import Demo._

Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.8.0"
version in ThisBuild := "0.8.1"

0 comments on commit 1da67e1

Please sign in to comment.