Skip to content

Commit

Permalink
Merge pull request #95 from profunktor/feature/cluster-functions
Browse files Browse the repository at this point in the history
Adding support for single-shard transactions
  • Loading branch information
gvolpe authored May 19, 2019
2 parents 5ed302d + 87adc43 commit 18aa968
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package dev.profunktor.redis4cats.connection

import cats.effect.{ Concurrent, ContextShift, Resource, Sync }
import cats.implicits._
import dev.profunktor.redis4cats.domain.{ LiveRedisClusterClient, RedisClusterClient }
import dev.profunktor.redis4cats.domain.{ LiveRedisClusterClient, NodeId, RedisClusterClient }
import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import io.lettuce.core.{ RedisURI => JRedisURI }
import io.lettuce.core.cluster.{ RedisClusterClient => JClusterClient }
import io.lettuce.core.cluster.{ SlotHash, RedisClusterClient => JClusterClient }
import io.lettuce.core.cluster.models.partitions.{ Partitions => JPartitions }

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -53,4 +54,15 @@ object RedisClusterClient {
Resource.make(acquire)(release)
}

def nodeId[F[_]: Sync](
client: RedisClusterClient,
keyName: String
): F[NodeId] =
Sync[F].delay(SlotHash.getSlot(keyName)).flatMap { slot =>
partitions(client).map(_.getPartitionBySlot(slot).getNodeId).map(NodeId)
}

def partitions[F[_]: Sync](client: RedisClusterClient): F[JPartitions] =
Sync[F].delay(client.underlying.getPartitions())

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,48 @@ package dev.profunktor.redis4cats.connection

import cats.effect.{ Concurrent, ContextShift, Sync }
import cats.syntax.all._
import dev.profunktor.redis4cats.domain.NodeId
import dev.profunktor.redis4cats.effect.JRFuture
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import scala.util.control.NoStackTrace

case class OperationNotSupported(value: String) extends NoStackTrace {
override def toString(): String = s"OperationNotSupported($value)"
}

private[redis4cats] trait RedisConnection[F[_], K, V] {
def async: F[RedisAsyncCommands[K, V]]
def clusterAsync: F[RedisClusterAsyncCommands[K, V]]
def close: F[Unit]
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]]
}

private[redis4cats] class RedisStatefulConnection[F[_]: Concurrent: ContextShift, K, V](
conn: StatefulRedisConnection[K, V]
) extends RedisConnection[F, K, V] {
override def async: F[RedisAsyncCommands[K, V]] = Sync[F].delay(conn.async())
override def clusterAsync: F[RedisClusterAsyncCommands[K, V]] =
Sync[F].raiseError(new Exception("Operation not supported"))
override def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
def async: F[RedisAsyncCommands[K, V]] = Sync[F].delay(conn.async())
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] =
Sync[F].raiseError(OperationNotSupported("Running in a single node"))
def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
Sync[F].raiseError(OperationNotSupported("Running in a single node"))
}

private[redis4cats] class RedisStatefulClusterConnection[F[_]: Concurrent: ContextShift, K, V](
conn: StatefulRedisClusterConnection[K, V]
) extends RedisConnection[F, K, V] {
override def async: F[RedisAsyncCommands[K, V]] =
Sync[F].raiseError(new Exception("Transactions are not supported on a cluster"))
override def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = Sync[F].delay(conn.async())
override def close: F[Unit] = JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
def async: F[RedisAsyncCommands[K, V]] =
Sync[F].raiseError(
OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.")
)
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = Sync[F].delay(conn.async())
def close: F[Unit] =
JRFuture.fromCompletableFuture(Sync[F].delay(conn.closeAsync())).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
JRFuture.fromCompletableFuture(Sync[F].delay(conn.getConnectionAsync(nodeId.value))).flatMap { stateful =>
Sync[F].delay(stateful.async())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ object domain {
}
case class LiveRedisCodec[K, V](underlying: JCodec[K, V]) extends RedisCodec[K, V]

case class NodeId(value: String) extends AnyVal

object RedisCodec {
val Ascii = LiveRedisCodec(StringCodec.ASCII)
val Utf8 = LiveRedisCodec(StringCodec.UTF8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import dev.profunktor.redis4cats.effect.{ JRFuture, Log }
import dev.profunktor.redis4cats.effects._
import io.lettuce.core.{ Limit => JLimit, Range => JRange, RedisURI => JRedisURI }
import io.lettuce.core.{ GeoArgs, GeoRadiusStoreArgs, GeoWithin, ScoredValue, ZAddArgs, ZStoreArgs }
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -63,6 +64,29 @@ object Redis {
(acquire, release)
}

private[redis4cats] def acquireAndReleaseClusterByNode[F[_]: Concurrent: ContextShift: Log, K, V](
client: RedisClusterClient,
codec: RedisCodec[K, V],
nodeId: NodeId
): (F[BaseRedis[F, K, V]], BaseRedis[F, K, V] => F[Unit]) = {
val acquire = JRFuture
.fromCompletableFuture {
Sync[F].delay(client.underlying.connectAsync[K, V](codec.underlying))
}
.map { c =>
new BaseRedis[F, K, V](new RedisStatefulClusterConnection(c), cluster = true) {
override def async: F[RedisClusterAsyncCommands[K, V]] =
if (cluster) conn.byNode(nodeId).widen[RedisClusterAsyncCommands[K, V]]
else conn.async.widen[RedisClusterAsyncCommands[K, V]]
}
}

val release: BaseRedis[F, K, V] => F[Unit] = c =>
Log[F].info(s"Releasing single-shard cluster Commands connection: ${client.underlying}") *> c.conn.close

(acquire, release)
}

def apply[F[_]: Concurrent: ContextShift: Log, K, V](
client: RedisClient,
codec: RedisCodec[K, V],
Expand All @@ -80,6 +104,15 @@ object Redis {
Resource.make(acquire)(release).map(_.asInstanceOf[RedisCommands[F, K, V]])
}

def clusterByNode[F[_]: Concurrent: ContextShift: Log, K, V](
clusterClient: RedisClusterClient,
codec: RedisCodec[K, V],
nodeId: NodeId
): Resource[F, RedisCommands[F, K, V]] = {
val (acquire, release) = acquireAndReleaseClusterByNode(clusterClient, codec, nodeId)
Resource.make(acquire)(release).map(_.asInstanceOf[RedisCommands[F, K, V]])
}

def masterSlave[F[_]: Concurrent: ContextShift: Log, K, V](
conn: RedisMasterSlaveConnection[K, V]
): F[RedisCommands[F, K, V]] =
Expand All @@ -96,45 +129,60 @@ private[redis4cats] class BaseRedis[F[_]: ContextShift, K, V](

import scala.collection.JavaConverters._

private val async: F[RedisClusterAsyncCommands[K, V]] =
def async: F[RedisClusterAsyncCommands[K, V]] =
if (cluster) conn.clusterAsync else conn.async.widen[RedisClusterAsyncCommands[K, V]]

override def del(key: K*): F[Unit] =
def del(key: K*): F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.del(key: _*)))
}.void

override def expire(key: K, expiresIn: FiniteDuration): F[Unit] =
def expire(key: K, expiresIn: FiniteDuration): F[Unit] =
JRFuture {
async.flatMap(c => F.delay(c.expire(key, expiresIn.toSeconds)))
}.void

/******************************* Transactions API **********************************/
// When in a cluster, transactions should run against a single node, therefore we use `conn.async` instead of `conn.clusterAsync`.

override def multi: F[Unit] =
def multi: F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.multi()))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.multi())
case _ => conn.async.flatMap(c => F.delay(c.multi()))
}
}.void

def exec: F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.exec()))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.exec())
case _ => conn.async.flatMap(c => F.delay(c.exec()))
}
}.void

def discard: F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.discard()))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.discard())
case _ => conn.async.flatMap(c => F.delay(c.discard()))
}
}.void

def watch(keys: K*): F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.watch(keys: _*)))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.watch(keys: _*))
case _ => conn.async.flatMap(c => F.delay(c.watch(keys: _*)))
}
}.void

def unwatch: F[Unit] =
JRFuture {
conn.async.flatMap(c => F.delay(c.unwatch()))
async.flatMap {
case c: RedisAsyncCommands[K, V] => F.delay(c.unwatch())
case _ => conn.async.flatMap(c => F.delay(c.unwatch()))
}
}.void

/******************************* Strings API **********************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fs2.{ Pipe, Stream }
import scala.concurrent.duration._
import scala.util.Random

object Fs2PubSubDemo extends LoggerIOApp {
object PubSubDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fs2.Stream
import scala.concurrent.duration._
import scala.util.Random

object Fs2PublisherDemo extends LoggerIOApp {
object PublisherDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisClusterStringsDemo extends LoggerIOApp {
object RedisClusterStringsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2018-2019 ProfunKtor
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.profunktor.redis4cats

import cats.effect.{ IO, Resource }
import cats.implicits._
import dev.profunktor.redis4cats.algebra.RedisCommands
import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.domain.RedisClusterClient
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis
import dev.profunktor.redis4cats.transactions._

object RedisClusterTransactionsDemo extends LoggerIOApp {

import Demo._

def program(implicit log: Log[IO]): IO[Unit] = {
val key1 = "test1"

val showResult: String => Option[String] => IO[Unit] = key =>
_.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s))

val commandsApi: Resource[IO, (RedisClusterClient, RedisCommands[IO, String, String])] =
for {
uri <- Resource.liftF(RedisURI.make[IO](redisClusterURI))
client <- RedisClusterClient[IO](uri)
redis <- Redis.cluster[IO, String, String](client, stringCodec)
} yield client -> redis

commandsApi
.use {
case (client, cmd) =>
val nodeCmdResource =
for {
_ <- Resource.liftF(cmd.set(key1, "empty"))
nodeId <- Resource.liftF(RedisClusterClient.nodeId[IO](client, key1))
nodeCmd <- Redis.clusterByNode[IO, String, String](client, stringCodec, nodeId)
} yield nodeCmd

// Transactions are only supported on a single node
val notAllowed =
cmd.multi.bracket(_ => cmd.set(key1, "nope") *> cmd.exec)(_ => cmd.discard).handleErrorWith {
case e: OperationNotSupported => putStrLn(e)
}

notAllowed *>
// Transaction runs in a single shard, where "key1" is stored
nodeCmdResource.use { nodeCmd =>
val tx = RedisTransaction(nodeCmd)

val getter = cmd.get(key1).flatTap(showResult(key1))
val setter = cmd.set(key1, "foo").start

val failedSetter =
cmd.set(key1, "qwe").start *>
IO.raiseError(new Exception("boom"))

val tx1 = tx.run(setter)
val tx2 = tx.run(failedSetter)

getter *> tx1 *> tx2.attempt *> getter.void
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import dev.profunktor.redis4cats.effects._
import dev.profunktor.redis4cats.interpreter.Redis
import io.lettuce.core.GeoArgs

object Fs2RedisGeoDemo extends LoggerIOApp {
object RedisGeoDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisHashesDemo extends LoggerIOApp {
object RedisHashesDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisListsDemo extends LoggerIOApp {
object RedisListsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.domain.{ ReadFrom, RedisMasterSlaveConnection }
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisMasterSlaveStringsDemo extends LoggerIOApp {
object RedisMasterSlaveStringsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisSetsDemo extends LoggerIOApp {
object RedisSetsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.effects.{ Score, ScoreWithValue, ZRange }
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisSortedSetsDemo extends LoggerIOApp {
object RedisSortedSetsDemo extends LoggerIOApp {

import Demo._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.interpreter.Redis

object Fs2RedisStringsDemo extends LoggerIOApp {
object RedisStringsDemo extends LoggerIOApp {

import Demo._

Expand Down
Loading

0 comments on commit 18aa968

Please sign in to comment.