From 31e23f17b5fa272a26d62535cad356e0bf4b243f Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Sat, 8 Sep 2018 21:17:03 +0900 Subject: [PATCH] Upgraded cats-effect and fs2. Bumping version to 0.3.0 --- build.sbt | 2 +- .../gvolpe/fs2redis/interpreter/pubsub/Fs2PubSub.scala | 6 +++--- .../fs2redis/interpreter/pubsub/Fs2PubSubCommands.scala | 2 +- .../gvolpe/fs2redis/interpreter/pubsub/Fs2Subscriber.scala | 2 +- .../interpreter/pubsub/internals/Fs2PubSubInternals.scala | 6 +++--- .../fs2redis/interpreter/pubsub/internals/package.scala | 5 ++--- project/Dependencies.scala | 4 ++-- 7 files changed, 13 insertions(+), 14 deletions(-) diff --git a/build.sbt b/build.sbt index 171d126b..d42c230b 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ name := """fs2-redis-root""" organization in ThisBuild := "com.github.gvolpe" -version in ThisBuild := "0.2.0" +version in ThisBuild := "0.3.0" crossScalaVersions in ThisBuild := Seq("2.12.6") diff --git a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2PubSub.scala b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2PubSub.scala index c9cff344..8b3cdaa3 100644 --- a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2PubSub.scala +++ b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2PubSub.scala @@ -23,7 +23,7 @@ import com.github.gvolpe.fs2redis.algebra.{PubSubCommands, PublishCommands, Subs import com.github.gvolpe.fs2redis.model._ import com.github.gvolpe.fs2redis.util.{JRFuture, Log} import fs2.Stream -import fs2.async.mutable +import fs2.concurrent.Topic import io.lettuce.core.RedisURI import io.lettuce.core.pubsub.StatefulRedisPubSubConnection @@ -58,7 +58,7 @@ object Fs2PubSub { val (acquire, release) = acquireAndRelease[F, K, V](client, codec, uri) // One exclusive connection for subscriptions and another connection for publishing / stats for { - state <- Stream.eval(Ref.of(Map.empty[K, mutable.Topic[F, Option[V]]])) + state <- Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]])) sConn <- Stream.bracket(acquire)(release) pConn <- Stream.bracket(acquire)(release) subs <- Stream.emit(new Fs2PubSubCommands[F, K, V](state, sConn, pConn)) @@ -91,7 +91,7 @@ object Fs2PubSub { val (acquire, release) = acquireAndRelease[F, K, V](client, codec, uri) for { - state <- Stream.eval(Ref.of(Map.empty[K, mutable.Topic[F, Option[V]]])) + state <- Stream.eval(Ref.of(Map.empty[K, Topic[F, Option[V]]])) sConn <- Stream.bracket(acquire)(release) } yield new Fs2Subscriber[F, K, V](state, sConn) } diff --git a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2PubSubCommands.scala b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2PubSubCommands.scala index ee8ccd90..5fe245b1 100644 --- a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2PubSubCommands.scala +++ b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2PubSubCommands.scala @@ -23,7 +23,7 @@ import com.github.gvolpe.fs2redis.algebra.{PubSubCommands, PubSubStats, Subscrib import com.github.gvolpe.fs2redis.interpreter.pubsub.internals.{Fs2PubSubInternals, PubSubState} import com.github.gvolpe.fs2redis.model import com.github.gvolpe.fs2redis.model.{Fs2RedisChannel, Subscription} -import com.github.gvolpe.fs2redis.util.{JRFuture, Log} +import com.github.gvolpe.fs2redis.util.JRFuture import fs2.Stream import io.lettuce.core.pubsub.StatefulRedisPubSubConnection diff --git a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2Subscriber.scala b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2Subscriber.scala index e5a50d37..2532f029 100644 --- a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2Subscriber.scala +++ b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/Fs2Subscriber.scala @@ -24,7 +24,7 @@ import com.github.gvolpe.fs2redis.interpreter.pubsub.internals.{Fs2PubSubInterna import com.github.gvolpe.fs2redis.model.Fs2RedisChannel import com.github.gvolpe.fs2redis.util.JRFuture import fs2.Stream -import fs2.async.mutable.Topic +import fs2.concurrent.Topic import io.lettuce.core.pubsub.StatefulRedisPubSubConnection class Fs2Subscriber[F[_], K, V](state: Ref[F, PubSubState[F, K, V]], subConnection: StatefulRedisPubSubConnection[K, V])( diff --git a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/internals/Fs2PubSubInternals.scala b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/internals/Fs2PubSubInternals.scala index 334dc10c..e1cfe449 100644 --- a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/internals/Fs2PubSubInternals.scala +++ b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/internals/Fs2PubSubInternals.scala @@ -22,14 +22,14 @@ import cats.effect.syntax.effect._ import cats.syntax.all._ import com.github.gvolpe.fs2redis.model.Fs2RedisChannel import com.github.gvolpe.fs2redis.util.Log -import fs2.async.mutable +import fs2.concurrent.Topic import io.lettuce.core.pubsub.{RedisPubSubListener, StatefulRedisPubSubConnection} object Fs2PubSubInternals { private[fs2redis] def defaultListener[F[_]: ConcurrentEffect, K, V]( fs2RedisChannel: Fs2RedisChannel[K], - topic: mutable.Topic[F, Option[V]]): RedisPubSubListener[K, V] = + topic: Topic[F, Option[V]]): RedisPubSubListener[K, V] = new RedisPubSubListener[K, V] { override def message(channel: K, message: V): Unit = if (channel == fs2RedisChannel.value) { @@ -49,7 +49,7 @@ object Fs2PubSubInternals { st.get(channel.value) .fold { for { - topic <- fs2.async.topic[F, Option[V]](None) + topic <- Topic[F, Option[V]](None) listener = defaultListener(channel, topic) _ <- L.info(s"Creating listener for channel: $channel") _ <- F.delay(subConnection.addListener(listener)) diff --git a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/internals/package.scala b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/internals/package.scala index 7cae60ad..5bd62fad 100644 --- a/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/internals/package.scala +++ b/core/src/main/scala/com/github/gvolpe/fs2redis/interpreter/pubsub/internals/package.scala @@ -17,11 +17,10 @@ package com.github.gvolpe.fs2redis.interpreter.pubsub import com.github.gvolpe.fs2redis.model.Fs2RedisChannel -import fs2.async.mutable -import fs2.async.mutable.Topic +import fs2.concurrent.Topic package object internals { - private[pubsub] type PubSubState[F[_], K, V] = Map[K, mutable.Topic[F, Option[V]]] + private[pubsub] type PubSubState[F[_], K, V] = Map[K, Topic[F, Option[V]]] private[pubsub] type GetOrCreateTopicListener[F[_], K, V] = Fs2RedisChannel[K] => PubSubState[F, K, V] => F[Topic[F, Option[V]]] } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 69a11ead..b5dbfc9e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,8 +3,8 @@ import sbt._ object Dependencies { object Versions { - val catsEffect = "1.0.0-RC3" - val fs2 = "1.0.0-M4" + val catsEffect = "1.0.0" + val fs2 = "1.0.0-M5" val lettuce = "5.1.0.M1" val scribe = "2.6.0"