From f3c5248d3faca9ffaa5dd037adcb24c31a1f0ccc Mon Sep 17 00:00:00 2001 From: Roman Makurin Date: Wed, 6 Mar 2024 09:57:12 +0300 Subject: [PATCH] add functions support (#850) * add functions support * add functions support. fix build for 2.12 --------- Co-authored-by: Roman Makurin --- .../redis4cats/algebra/scripts.scala | 20 +++- .../dev/profunktor/redis4cats/effects.scala | 21 +++- .../dev/profunktor/redis4cats/redis.scala | 107 ++++++++++++++++-- .../redis4cats/RedisClusterSpec.scala | 2 + .../dev/profunktor/redis4cats/RedisSpec.scala | 2 + .../profunktor/redis4cats/TestScenarios.scala | 54 +++++++-- 6 files changed, 185 insertions(+), 21 deletions(-) diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/algebra/scripts.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/algebra/scripts.scala index 1b091624..f15e7bb7 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/algebra/scripts.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/algebra/scripts.scala @@ -16,9 +16,9 @@ package dev.profunktor.redis4cats.algebra -import dev.profunktor.redis4cats.effects.ScriptOutputType +import dev.profunktor.redis4cats.effects.{ FlushMode, FunctionRestoreMode, ScriptOutputType } -trait ScriptCommands[F[_], K, V] extends Scripting[F, K, V] +trait ScriptCommands[F[_], K, V] extends Scripting[F, K, V] with Functions[F, K, V] trait Scripting[F[_], K, V] { // these methods don't use varargs as they cause problems with type inference, see: @@ -35,3 +35,19 @@ trait Scripting[F[_], K, V] { def scriptFlush: F[Unit] def digest(script: String): F[String] } + +trait Functions[F[_], K, V] { + def fcall(function: String, output: ScriptOutputType[V], keys: List[K]): F[output.R] + def fcall(function: String, output: ScriptOutputType[V], keys: List[K], values: List[V]): F[output.R] + def fcallReadOnly(function: String, output: ScriptOutputType[V], keys: List[K]): F[output.R] + def fcallReadOnly(function: String, output: ScriptOutputType[V], keys: List[K], values: List[V]): F[output.R] + def functionLoad(functionCode: String): F[String] + def functionLoad(functionCode: String, replace: Boolean): F[String] + def functionDump(): F[Array[Byte]] + def functionRestore(dump: Array[Byte]): F[String] + def functionRestore(dump: Array[Byte], mode: FunctionRestoreMode): F[String] + def functionFlush(flushMode: FlushMode): F[String] + def functionKill(): F[String] + def functionList(): F[List[Map[String, Any]]] + def functionList(libraryName: String): F[List[Map[String, Any]]] +} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/effects.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/effects.scala index f93e3f7b..3829b543 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/effects.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/effects.scala @@ -82,11 +82,11 @@ object effects { override private[redis4cats] def convert(in: java.util.List[V]): List[V] = in.asScala.toList } - def Status[V]: ScriptOutputType.Aux[V, Unit] = new ScriptOutputType[V] { - type R = Unit + def Status[V]: ScriptOutputType.Aux[V, String] = new ScriptOutputType[V] { + type R = String private[redis4cats] type Underlying = String - override private[redis4cats] val outputType = JScriptOutputType.STATUS - override private[redis4cats] def convert(in: String): Unit = () + override private[redis4cats] val outputType = JScriptOutputType.STATUS + override private[redis4cats] def convert(in: String): String = in } } @@ -104,6 +104,19 @@ object effects { def apply(`match`: String, count: Long): ScanArgs = ScanArgs(Some(`match`), Some(count)) } + sealed trait FlushMode + object FlushMode { + case object Sync extends FlushMode + case object Async extends FlushMode + } + + sealed trait FunctionRestoreMode + object FunctionRestoreMode { + case object Append extends FunctionRestoreMode + case object Flush extends FunctionRestoreMode + case object Replace extends FunctionRestoreMode + } + sealed trait GetExArg object GetExArg { diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala index 0b9af637..88274ab5 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala @@ -16,8 +16,6 @@ package dev.profunktor.redis4cats -import java.time.Instant -import java.util.concurrent.TimeUnit import cats._ import cats.data.NonEmptyList import cats.effect.kernel._ @@ -27,10 +25,13 @@ import dev.profunktor.redis4cats.algebra.BitCommandOperation.Overflows import dev.profunktor.redis4cats.config.Redis4CatsConfig import dev.profunktor.redis4cats.connection._ import dev.profunktor.redis4cats.data._ -import dev.profunktor.redis4cats.effect._ import dev.profunktor.redis4cats.effect.FutureLift._ +import dev.profunktor.redis4cats.effect._ import dev.profunktor.redis4cats.effects._ import dev.profunktor.redis4cats.tx.{ TransactionDiscarded, TxRunner, TxStore } +import io.lettuce.core.api.async.RedisAsyncCommands +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands +import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands } import io.lettuce.core.{ BitFieldArgs, ClientOptions, @@ -42,18 +43,19 @@ import io.lettuce.core.{ ZAddArgs, ZAggregateArgs, ZStoreArgs, + ExpireArgs => JExpireArgs, + FlushMode => JFlushMode, + FunctionRestoreMode => JFunctionRestoreMode, GetExArgs => JGetExArgs, Limit => JLimit, Range => JRange, ReadFrom => JReadFrom, ScanCursor => JScanCursor, - SetArgs => JSetArgs, - ExpireArgs => JExpireArgs + SetArgs => JSetArgs } -import io.lettuce.core.api.async.RedisAsyncCommands -import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands -import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands } +import java.time.Instant +import java.util.concurrent.TimeUnit import scala.concurrent.duration._ object Redis { @@ -1289,6 +1291,95 @@ private[redis4cats] class BaseRedis[F[_]: FutureLift: MonadThrow: Log, K, V]( override def digest(script: String): F[String] = async.map(_.digest(script)) + override def fcall(function: String, output: ScriptOutputType[V], keys: List[K]): F[output.R] = + async.flatMap( + _.fcall[output.Underlying]( + function, + output.outputType, + keys: _* + ).futureLift.map(output.convert(_)) + ) + + override def fcall(function: String, output: ScriptOutputType[V], keys: List[K], values: List[V]): F[output.R] = + async.flatMap( + _.fcall[output.Underlying]( + function, + output.outputType, + // The Object requirement comes from the limitations of Java Generics. It is safe to assume K <: Object as + // the underlying JRedisCodec would also only support K <: Object. + keys.toArray[Any].asInstanceOf[Array[K with Object]], + values: _* + ).futureLift.map(output.convert(_)) + ) + + override def fcallReadOnly(function: String, output: ScriptOutputType[V], keys: List[K]): F[output.R] = + async.flatMap( + _.fcallReadOnly[output.Underlying]( + function, + output.outputType, + keys: _* + ).futureLift.map(output.convert(_)) + ) + + override def fcallReadOnly( + function: String, + output: ScriptOutputType[V], + keys: List[K], + values: List[V] + ): F[output.R] = + async.flatMap( + _.fcallReadOnly[output.Underlying]( + function, + output.outputType, + // The Object requirement comes from the limitations of Java Generics. It is safe to assume K <: Object as + // the underlying JRedisCodec would also only support K <: Object. + keys.toArray[Any].asInstanceOf[Array[K with Object]], + values: _* + ).futureLift.map(output.convert(_)) + ) + + override def functionLoad(functionCode: String): F[String] = + async.flatMap(_.functionLoad(functionCode).futureLift) + + override def functionLoad(functionCode: String, replace: Boolean): F[String] = + async.flatMap(_.functionLoad(functionCode, replace).futureLift) + + override def functionDump(): F[Array[Byte]] = + async.flatMap(_.functionDump().futureLift) + + override def functionRestore(dump: Array[Byte]): F[String] = + async.flatMap(_.functionRestore(dump).futureLift) + + override def functionRestore(dump: Array[Byte], mode: FunctionRestoreMode): F[String] = { + val jMode = mode match { + case FunctionRestoreMode.Flush => JFunctionRestoreMode.FLUSH + case FunctionRestoreMode.Append => JFunctionRestoreMode.APPEND + case FunctionRestoreMode.Replace => JFunctionRestoreMode.REPLACE + } + async.flatMap(_.functionRestore(dump, jMode).futureLift) + } + + override def functionFlush(flushMode: FlushMode): F[String] = { + val jFlushMode = flushMode match { + case FlushMode.Sync => JFlushMode.SYNC + case FlushMode.Async => JFlushMode.ASYNC + } + async.flatMap(_.functionFlush(jFlushMode).futureLift) + } + + override def functionKill(): F[String] = + async.flatMap(_.functionKill().futureLift) + + override def functionList(): F[List[Map[String, Any]]] = + async + .flatMap(_.functionList().futureLift) + .map(_.asScala.map(_.asScala.toMap).toList) + + override def functionList(libraryName: String): F[List[Map[String, Any]]] = + async + .flatMap(_.functionList(libraryName).futureLift) + .map(_.asScala.map(_.asScala.toMap).toList) + /** ***************************** HyperLoglog API **********************************/ override def pfAdd(key: K, values: V*): F[Long] = async.flatMap(_.pfadd(key, values: _*).futureLift.map(Long.box(_))) diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala index 6ed85eaf..505d0215 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisClusterSpec.scala @@ -42,6 +42,8 @@ class RedisClusterSpec extends Redis4CatsFunSuite(true) with TestScenarios { test("cluster: scripts")(withRedis(scriptsScenario)) + test("cluster: functions")(withRedis(functionsScenario)) + test("cluster: hyperloglog api")(withRedis(hyperloglogScenario)) // FIXME: The Cluster impl cannot connect to a single node just yet diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala index bcf59bc1..148cb48e 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala @@ -48,6 +48,8 @@ class RedisSpec extends Redis4CatsFunSuite(false) with TestScenarios { test("scripts")(withRedis(scriptsScenario)) + test("functions")(withRedis(functionsScenario)) + test("hyperloglog api")(withRedis(hyperloglogScenario)) test("pattern key sub")(withRedisClient(keyPatternSubScenario)) diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala index 0ed8189f..af24af86 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala @@ -16,15 +16,9 @@ package dev.profunktor.redis4cats -import java.time.Instant - -import scala.concurrent.duration._ - import cats.data.NonEmptyList import cats.effect._ import cats.implicits._ -import fs2.Stream - import dev.profunktor.redis4cats.algebra.BitCommandOperation.{ IncrUnsignedBy, SetUnsigned } import dev.profunktor.redis4cats.algebra.BitCommands import dev.profunktor.redis4cats.connection.RedisClient @@ -32,9 +26,13 @@ import dev.profunktor.redis4cats.data._ import dev.profunktor.redis4cats.effects._ import dev.profunktor.redis4cats.pubsub.PubSub import dev.profunktor.redis4cats.tx._ -import io.lettuce.core.{ GeoArgs, RedisException, ZAggregateArgs } +import fs2.Stream +import io.lettuce.core.{ GeoArgs, RedisCommandExecutionException, RedisException, ZAggregateArgs } import munit.FunSuite +import java.time.Instant +import scala.concurrent.duration._ + trait TestScenarios { self: FunSuite => def locationScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { @@ -610,6 +608,48 @@ trait TestScenarios { self: FunSuite => } yield () } + def functionsScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { + val myFunc = + """#!lua name=mylib + | redis.register_function('myfunc', function(keys, args) return args[1] end) + | """.stripMargin + + val myEcho = + """#!lua name=mylib_2 + | local function my_echo(keys, args) + | return args[1] + | end + | redis.register_function{function_name='my_echo',callback=my_echo, flags={ 'no-writes' }} + | """.stripMargin + + for { + _ <- redis.functionFlush(FlushMode.Sync) + _ <- redis.functionLoad(myFunc) + _ <- redis.functionLoad(myFunc).recover({ case _: RedisCommandExecutionException => "" }) + _ <- redis.functionLoad(myFunc, replace = true) + fcallResult <- redis.fcall("myfunc", ScriptOutputType.Status, List("key"), List("Hello")) + _ <- IO(assertEquals(fcallResult, "Hello")) + _ <- redis.functionFlush(FlushMode.Sync) + _ <- redis.functionLoad(myEcho) + fcallReadOnlyResult <- redis.fcallReadOnly("my_echo", ScriptOutputType.Status, List("key"), List("Hello")) + _ <- IO(assertEquals(fcallReadOnlyResult, "Hello")) + _ <- redis.functionFlush(FlushMode.Sync) + _ <- redis.functionLoad(myFunc) + dump <- redis.functionDump() + _ <- redis.functionFlush(FlushMode.Sync) + _ <- redis.functionRestore(dump) + fcallRestoreResult <- redis.fcall("myfunc", ScriptOutputType.Status, List("key"), List("Hello")) + _ <- IO(assertEquals(fcallRestoreResult, "Hello")) + _ <- redis.functionFlush(FlushMode.Sync) + listResult <- redis.functionList() + _ = assertEquals(listResult.size, 0) + _ <- redis.functionLoad(myFunc) + _ <- redis.functionLoad(myEcho) + listResult <- redis.functionList() + _ = assertEquals(listResult.size, 2) + } yield () + } + def hyperloglogScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val key = "hll" val key2 = "hll2"