diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClusterClient.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClusterClient.scala index 6fdf2879..572382b4 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClusterClient.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisClusterClient.scala @@ -18,10 +18,11 @@ package dev.profunktor.redis4cats.connection import cats.effect.{ Concurrent, ContextShift, Resource, Sync } import cats.implicits._ -import dev.profunktor.redis4cats.domain.{ LiveRedisClusterClient, RedisClusterClient } +import dev.profunktor.redis4cats.domain.{ LiveRedisClusterClient, NodeId, RedisClusterClient } import dev.profunktor.redis4cats.effect.{ JRFuture, Log } import io.lettuce.core.{ RedisURI => JRedisURI } -import io.lettuce.core.cluster.{ RedisClusterClient => JClusterClient } +import io.lettuce.core.cluster.{ SlotHash, RedisClusterClient => JClusterClient } +import io.lettuce.core.cluster.models.partitions.{ Partitions => JPartitions } import scala.collection.JavaConverters._ @@ -53,4 +54,15 @@ object RedisClusterClient { Resource.make(acquire)(release) } + def nodeId[F[_]: Sync]( + client: RedisClusterClient, + keyName: String + ): F[NodeId] = + Sync[F].delay(SlotHash.getSlot(keyName)).flatMap { slot => + partitions(client).map(_.getPartitionBySlot(slot).getNodeId).map(NodeId) + } + + def partitions[F[_]: Sync](client: RedisClusterClient): F[JPartitions] = + Sync[F].delay(client.underlying.getPartitions()) + } diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisConnection.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisConnection.scala index a2982cbd..671e80ae 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisConnection.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/connection/RedisConnection.scala @@ -18,32 +18,48 @@ package dev.profunktor.redis4cats.connection import cats.effect.{ Concurrent, ContextShift, Sync } import cats.syntax.all._ +import dev.profunktor.redis4cats.domain.NodeId import dev.profunktor.redis4cats.effect.JRFuture import io.lettuce.core.api.StatefulRedisConnection import io.lettuce.core.api.async.RedisAsyncCommands import io.lettuce.core.cluster.api.StatefulRedisClusterConnection import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands +import scala.util.control.NoStackTrace + +case class OperationNotSupported(value: String) extends NoStackTrace { + override def toString(): String = s"OperationNotSupported($value)" +} private[redis4cats] trait RedisConnection[F[_], K, V] { def async: F[RedisAsyncCommands[K, V]] def clusterAsync: F[RedisClusterAsyncCommands[K, V]] def close: F[Unit] + def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] } private[redis4cats] class RedisStatefulConnection[F[_]: Concurrent: ContextShift, K, V]( conn: StatefulRedisConnection[K, V] ) extends RedisConnection[F, K, V] { - override def async: F[RedisAsyncCommands[K, V]] = Sync[F].delay(conn.async()) - override def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = - Sync[F].raiseError(new Exception("Operation not supported")) - override def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void + def async: F[RedisAsyncCommands[K, V]] = Sync[F].delay(conn.async()) + def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = + Sync[F].raiseError(OperationNotSupported("Running in a single node")) + def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void + def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] = + Sync[F].raiseError(OperationNotSupported("Running in a single node")) } private[redis4cats] class RedisStatefulClusterConnection[F[_]: Concurrent: ContextShift, K, V]( conn: StatefulRedisClusterConnection[K, V] ) extends RedisConnection[F, K, V] { - override def async: F[RedisAsyncCommands[K, V]] = - Sync[F].raiseError(new Exception("Transactions are not supported on a cluster")) - override def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = Sync[F].delay(conn.async()) - override def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void + def async: F[RedisAsyncCommands[K, V]] = + Sync[F].raiseError( + OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.") + ) + def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = Sync[F].delay(conn.async()) + def close: F[Unit] = + JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void + def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] = + JRFuture.fromCompletableFuture(Sync[F].delay(conn.getConnectionAsync(nodeId.value))).flatMap { stateful => + Sync[F].delay(stateful.async()) + } } diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/domain.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/domain.scala index 3d6f02a8..4d6765af 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/domain.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/domain.scala @@ -51,6 +51,8 @@ object domain { } case class LiveRedisCodec[K, V](underlying: JCodec[K, V]) extends RedisCodec[K, V] + case class NodeId(value: String) extends AnyVal + object RedisCodec { val Ascii = LiveRedisCodec(StringCodec.ASCII) val Utf8 = LiveRedisCodec(StringCodec.UTF8) diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/interpreter/Redis.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/interpreter/Redis.scala index 9ab54639..25c257e9 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/interpreter/Redis.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/interpreter/Redis.scala @@ -25,6 +25,7 @@ import dev.profunktor.redis4cats.effect.{ JRFuture, Log } import dev.profunktor.redis4cats.effects._ import io.lettuce.core.{ Limit => JLimit, Range => JRange, RedisURI => JRedisURI } import io.lettuce.core.{ GeoArgs, GeoRadiusStoreArgs, GeoWithin, ScoredValue, ZAddArgs, ZStoreArgs } +import io.lettuce.core.api.async.RedisAsyncCommands import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration @@ -63,6 +64,29 @@ object Redis { (acquire, release) } + private[redis4cats] def acquireAndReleaseClusterByNode[F[_]: Concurrent: ContextShift: Log, K, V]( + client: RedisClusterClient, + codec: RedisCodec[K, V], + nodeId: NodeId + ): (F[BaseRedis[F, K, V]], BaseRedis[F, K, V] => F[Unit]) = { + val acquire = JRFuture + .fromCompletableFuture { + Sync[F].delay(client.underlying.connectAsync[K, V](codec.underlying)) + } + .map { c => + new BaseRedis[F, K, V](new RedisStatefulClusterConnection(c), cluster = true) { + override def async: F[RedisClusterAsyncCommands[K, V]] = + if (cluster) conn.byNode(nodeId).widen[RedisClusterAsyncCommands[K, V]] + else conn.async.widen[RedisClusterAsyncCommands[K, V]] + } + } + + val release: BaseRedis[F, K, V] => F[Unit] = c => + Log[F].info(s"Releasing single-shard cluster Commands connection: ${client.underlying}") *> c.conn.close + + (acquire, release) + } + def apply[F[_]: Concurrent: ContextShift: Log, K, V]( client: RedisClient, codec: RedisCodec[K, V], @@ -80,6 +104,15 @@ object Redis { Resource.make(acquire)(release).map(_.asInstanceOf[RedisCommands[F, K, V]]) } + def clusterByNode[F[_]: Concurrent: ContextShift: Log, K, V]( + clusterClient: RedisClusterClient, + codec: RedisCodec[K, V], + nodeId: NodeId + ): Resource[F, RedisCommands[F, K, V]] = { + val (acquire, release) = acquireAndReleaseClusterByNode(clusterClient, codec, nodeId) + Resource.make(acquire)(release).map(_.asInstanceOf[RedisCommands[F, K, V]]) + } + def masterSlave[F[_]: Concurrent: ContextShift: Log, K, V]( conn: RedisMasterSlaveConnection[K, V] ): F[RedisCommands[F, K, V]] = @@ -96,15 +129,15 @@ private[redis4cats] class BaseRedis[F[_]: ContextShift, K, V]( import scala.collection.JavaConverters._ - private val async: F[RedisClusterAsyncCommands[K, V]] = + def async: F[RedisClusterAsyncCommands[K, V]] = if (cluster) conn.clusterAsync else conn.async.widen[RedisClusterAsyncCommands[K, V]] - override def del(key: K*): F[Unit] = + def del(key: K*): F[Unit] = JRFuture { async.flatMap(c => F.delay(c.del(key: _*))) }.void - override def expire(key: K, expiresIn: FiniteDuration): F[Unit] = + def expire(key: K, expiresIn: FiniteDuration): F[Unit] = JRFuture { async.flatMap(c => F.delay(c.expire(key, expiresIn.toSeconds))) }.void @@ -112,29 +145,44 @@ private[redis4cats] class BaseRedis[F[_]: ContextShift, K, V]( /******************************* Transactions API **********************************/ // When in a cluster, transactions should run against a single node, therefore we use `conn.async` instead of `conn.clusterAsync`. - override def multi: F[Unit] = + def multi: F[Unit] = JRFuture { - conn.async.flatMap(c => F.delay(c.multi())) + async.flatMap { + case c: RedisAsyncCommands[K, V] => F.delay(c.multi()) + case _ => conn.async.flatMap(c => F.delay(c.multi())) + } }.void def exec: F[Unit] = JRFuture { - conn.async.flatMap(c => F.delay(c.exec())) + async.flatMap { + case c: RedisAsyncCommands[K, V] => F.delay(c.exec()) + case _ => conn.async.flatMap(c => F.delay(c.exec())) + } }.void def discard: F[Unit] = JRFuture { - conn.async.flatMap(c => F.delay(c.discard())) + async.flatMap { + case c: RedisAsyncCommands[K, V] => F.delay(c.discard()) + case _ => conn.async.flatMap(c => F.delay(c.discard())) + } }.void def watch(keys: K*): F[Unit] = JRFuture { - conn.async.flatMap(c => F.delay(c.watch(keys: _*))) + async.flatMap { + case c: RedisAsyncCommands[K, V] => F.delay(c.watch(keys: _*)) + case _ => conn.async.flatMap(c => F.delay(c.watch(keys: _*))) + } }.void def unwatch: F[Unit] = JRFuture { - conn.async.flatMap(c => F.delay(c.unwatch())) + async.flatMap { + case c: RedisAsyncCommands[K, V] => F.delay(c.unwatch()) + case _ => conn.async.flatMap(c => F.delay(c.unwatch())) + } }.void /******************************* Strings API **********************************/ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/PubSubDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/PubSubDemo.scala index 5a1dc381..7bc68482 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/PubSubDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/PubSubDemo.scala @@ -26,7 +26,7 @@ import fs2.{ Pipe, Stream } import scala.concurrent.duration._ import scala.util.Random -object Fs2PubSubDemo extends LoggerIOApp { +object PubSubDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/PublisherDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/PublisherDemo.scala index dbce0700..f1c59afe 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/PublisherDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/PublisherDemo.scala @@ -26,7 +26,7 @@ import fs2.Stream import scala.concurrent.duration._ import scala.util.Random -object Fs2PublisherDemo extends LoggerIOApp { +object PublisherDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterStringsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterStringsDemo.scala index f73235dc..943a5d39 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterStringsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterStringsDemo.scala @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.interpreter.Redis -object Fs2RedisClusterStringsDemo extends LoggerIOApp { +object RedisClusterStringsDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterTransactionsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterTransactionsDemo.scala new file mode 100644 index 00000000..56768795 --- /dev/null +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisClusterTransactionsDemo.scala @@ -0,0 +1,82 @@ +/* + * Copyright 2018-2019 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats + +import cats.effect.{ IO, Resource } +import cats.implicits._ +import dev.profunktor.redis4cats.algebra.RedisCommands +import dev.profunktor.redis4cats.connection._ +import dev.profunktor.redis4cats.domain.RedisClusterClient +import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.interpreter.Redis +import dev.profunktor.redis4cats.transactions._ + +object RedisClusterTransactionsDemo extends LoggerIOApp { + + import Demo._ + + def program(implicit log: Log[IO]): IO[Unit] = { + val key1 = "test1" + + val showResult: String => Option[String] => IO[Unit] = key => + _.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s)) + + val commandsApi: Resource[IO, (RedisClusterClient, RedisCommands[IO, String, String])] = + for { + uri <- Resource.liftF(RedisURI.make[IO](redisClusterURI)) + client <- RedisClusterClient[IO](uri) + redis <- Redis.cluster[IO, String, String](client, stringCodec) + } yield client -> redis + + commandsApi + .use { + case (client, cmd) => + val nodeCmdResource = + for { + _ <- Resource.liftF(cmd.set(key1, "empty")) + nodeId <- Resource.liftF(RedisClusterClient.nodeId[IO](client, key1)) + nodeCmd <- Redis.clusterByNode[IO, String, String](client, stringCodec, nodeId) + } yield nodeCmd + + // Transactions are only supported on a single node + val notAllowed = + cmd.multi.bracket(_ => cmd.set(key1, "nope") *> cmd.exec)(_ => cmd.discard).handleErrorWith { + case e: OperationNotSupported => putStrLn(e) + } + + notAllowed *> + // Transaction runs in a single shard, where "key1" is stored + nodeCmdResource.use { nodeCmd => + val tx = RedisTransaction(nodeCmd) + + val getter = cmd.get(key1).flatTap(showResult(key1)) + val setter = cmd.set(key1, "foo").start + + val failedSetter = + cmd.set(key1, "qwe").start *> + IO.raiseError(new Exception("boom")) + + val tx1 = tx.run(setter) + val tx2 = tx.run(failedSetter) + + getter *> tx1 *> tx2.attempt *> getter.void + } + } + + } + +} diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisGeoDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisGeoDemo.scala index 0fda0239..9fe06947 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisGeoDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisGeoDemo.scala @@ -24,7 +24,7 @@ import dev.profunktor.redis4cats.effects._ import dev.profunktor.redis4cats.interpreter.Redis import io.lettuce.core.GeoArgs -object Fs2RedisGeoDemo extends LoggerIOApp { +object RedisGeoDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisHashesDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisHashesDemo.scala index 5ebb2ad7..d12c6f47 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisHashesDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisHashesDemo.scala @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.interpreter.Redis -object Fs2RedisHashesDemo extends LoggerIOApp { +object RedisHashesDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisListsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisListsDemo.scala index 38a58d9c..21f98dfb 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisListsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisListsDemo.scala @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.interpreter.Redis -object Fs2RedisListsDemo extends LoggerIOApp { +object RedisListsDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisMasterSlaveStringsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisMasterSlaveStringsDemo.scala index 5a0fbc10..d3b24d0f 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisMasterSlaveStringsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisMasterSlaveStringsDemo.scala @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.domain.{ ReadFrom, RedisMasterSlaveConnection } import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.interpreter.Redis -object Fs2RedisMasterSlaveStringsDemo extends LoggerIOApp { +object RedisMasterSlaveStringsDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisSetsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisSetsDemo.scala index 56da903e..66555bee 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisSetsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisSetsDemo.scala @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.interpreter.Redis -object Fs2RedisSetsDemo extends LoggerIOApp { +object RedisSetsDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisSortedSetsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisSortedSetsDemo.scala index d3030e59..27d54231 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisSortedSetsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisSortedSetsDemo.scala @@ -23,7 +23,7 @@ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.effects.{ Score, ScoreWithValue, ZRange } import dev.profunktor.redis4cats.interpreter.Redis -object Fs2RedisSortedSetsDemo extends LoggerIOApp { +object RedisSortedSetsDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisStringsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisStringsDemo.scala index f4a0bd57..93df95a9 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisStringsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisStringsDemo.scala @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.interpreter.Redis -object Fs2RedisStringsDemo extends LoggerIOApp { +object RedisStringsDemo extends LoggerIOApp { import Demo._ diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala index 9175417f..dc0583f7 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTransactionsDemo.scala @@ -17,8 +17,7 @@ package dev.profunktor.redis4cats import cats.effect.{ IO, Resource } -import cats.instances.list._ -import cats.syntax.all._ +import cats.implicits._ import dev.profunktor.redis4cats.algebra.RedisCommands import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.effect.Log diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/StreamingDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/StreamingDemo.scala index 770377a1..8740a401 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/StreamingDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/StreamingDemo.scala @@ -27,7 +27,7 @@ import fs2.Stream import scala.concurrent.duration._ import scala.util.Random -object Fs2StreamingDemo extends LoggerIOApp { +object StreamingDemo extends LoggerIOApp { import Demo._ diff --git a/version.sbt b/version.sbt index c3945bb4..ccb5c052 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.8.0" +version in ThisBuild := "0.8.1"