Skip to content

Commit

Permalink
Merge pull request #13 from gvolpe/feature/upgrade-fs2-1.0
Browse files Browse the repository at this point in the history
Upgrade to cats-effect 1.0.0-RC3 and fs2 1.0.0-M4
  • Loading branch information
gvolpe authored Aug 30, 2018
2 parents 052edb2 + cafa146 commit 1eada57
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 78 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name := """fs2-redis-root"""

organization in ThisBuild := "com.github.gvolpe"

version in ThisBuild := "0.1.0"
version in ThisBuild := "0.2.0"

crossScalaVersions in ThisBuild := Seq("2.12.4")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ object Fs2Redis {

def stream[F[_]: Concurrent: Log, K, V](client: Fs2RedisClient,
codec: Fs2RedisCodec[K, V],
uri: RedisURI): Stream[F, RedisCommands[F, K, V]] = {
val (acquire, release) = acquireAndRelease(client, codec, uri)
Stream.bracket(acquire)(release)
}
uri: RedisURI): Stream[F, RedisCommands[F, K, V]] =
Stream.resource(apply(client, codec, uri))

def masterSlave[F[_]: Concurrent: Log, K, V](conn: Fs2RedisMasterSlaveConnection[K, V]): F[RedisCommands[F, K, V]] =
new Fs2Redis[F, K, V](conn.underlying).asInstanceOf[RedisCommands[F, K, V]].pure[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ object Fs2RedisClient {
Resource.make(acquire)(release)
}

def stream[F[_]: Concurrent: Log](uri: RedisURI): Stream[F, Fs2RedisClient] = {
val (acquire, release) = acquireAndRelease(uri)
Stream.bracket(acquire)(release)
}
def stream[F[_]: Concurrent: Log](uri: RedisURI): Stream[F, Fs2RedisClient] =
Stream.resource(apply(uri))

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@ object Fs2RedisMasterSlave {
}

def stream[F[_]: Concurrent: Log, K, V](codec: Fs2RedisCodec[K, V], uris: RedisURI*)(
readFrom: Option[ReadFrom] = None): Stream[F, Fs2RedisMasterSlaveConnection[K, V]] = {
val (acquireClient, releaseClient) = Fs2RedisClient.acquireAndReleaseWithoutUri[F]
Stream.bracket(acquireClient)(releaseClient).flatMap { client =>
val (acquire, release) = acquireAndRelease(client, codec, readFrom, uris: _*)
Stream.bracket(acquire)(release)
}
}

readFrom: Option[ReadFrom] = None): Stream[F, Fs2RedisMasterSlaveConnection[K, V]] =
Stream.resource(apply(codec, uris: _*)(readFrom))

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@

package com.github.gvolpe.fs2redis

import cats.effect.IO
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.apply._
import com.github.gvolpe.fs2redis.interpreter.connection.Fs2RedisClient
import com.github.gvolpe.fs2redis.interpreter.pubsub.Fs2PubSub
import com.github.gvolpe.fs2redis.model.DefaultChannel
import fs2.StreamApp.ExitCode
import fs2.{Sink, Stream, StreamApp}
import fs2.{Sink, Stream}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

object Fs2PubSubDemo extends StreamApp[IO] {
object Fs2PubSubDemo extends IOApp {

import Demo._

Expand All @@ -36,24 +35,27 @@ object Fs2PubSubDemo extends StreamApp[IO] {

def sink(name: String): Sink[IO, String] = _.evalMap(x => putStrLn(s"Subscriber: $name >> $x"))

override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
def stream(args: List[String]): Stream[IO, Unit] =
for {
client <- Fs2RedisClient.stream[IO](redisURI)
pubSub <- Fs2PubSub.mkPubSubConnection[IO, String, String](client, stringCodec, redisURI)
sub1 = pubSub.subscribe(eventsChannel)
sub2 = pubSub.subscribe(gamesChannel)
pub1 = pubSub.publish(eventsChannel)
pub2 = pubSub.publish(gamesChannel)
rs <- Stream(
sub1 to sink("#events"),
sub2 to sink("#games"),
Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)) to pub1,
Stream.awakeEvery[IO](5.seconds) >> Stream.emit("Pac-Man!") to pub2,
Stream.awakeDelay[IO](11.seconds) >> pubSub.unsubscribe(gamesChannel),
Stream.awakeEvery[IO](6.seconds) >> pubSub
.pubSubSubscriptions(List(eventsChannel, gamesChannel))
.evalMap(x => putStrLn(x.toString))
).join(6).drain
} yield rs
_ <- Stream(
sub1 to sink("#events"),
sub2 to sink("#games"),
Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)) to pub1,
Stream.awakeEvery[IO](5.seconds) >> Stream.emit("Pac-Man!") to pub2,
Stream.awakeDelay[IO](11.seconds) >> pubSub.unsubscribe(gamesChannel),
Stream.awakeEvery[IO](6.seconds) >> pubSub
.pubSubSubscriptions(List(eventsChannel, gamesChannel))
.evalMap(x => putStrLn(x.toString))
).parJoin(6).drain
} yield ()

override def run(args: List[String]): IO[ExitCode] =
stream(args).compile.drain *> IO.pure(ExitCode.Success)

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,36 @@

package com.github.gvolpe.fs2redis

import cats.effect.IO
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.apply._
import com.github.gvolpe.fs2redis.interpreter.connection.Fs2RedisClient
import com.github.gvolpe.fs2redis.interpreter.pubsub.Fs2PubSub
import com.github.gvolpe.fs2redis.model.DefaultChannel
import fs2.StreamApp.ExitCode
import fs2.{Stream, StreamApp}
import fs2.Stream

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

object Fs2PublisherDemo extends StreamApp[IO] {
object Fs2PublisherDemo extends IOApp {

import Demo._

private val eventsChannel = DefaultChannel("events")

override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
def stream(args: List[String]): Stream[IO, Unit] =
for {
client <- Fs2RedisClient.stream[IO](redisURI)
pubSub <- Fs2PubSub.mkPublisherConnection[IO, String, String](client, stringCodec, redisURI)
pub1 = pubSub.publish(eventsChannel)
rs <- Stream(
Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)) to pub1,
Stream.awakeEvery[IO](6.seconds) >> pubSub
.pubSubSubscriptions(eventsChannel)
.evalMap(x => putStrLn(x.toString))
).join(2).drain
} yield rs
_ <- Stream(
Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)) to pub1,
Stream.awakeEvery[IO](6.seconds) >> pubSub
.pubSubSubscriptions(eventsChannel)
.evalMap(x => putStrLn(x.toString))
).parJoin(2).drain
} yield ()

override def run(args: List[String]): IO[ExitCode] =
stream(args).compile.drain *> IO.pure(ExitCode.Success)

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@

package com.github.gvolpe.fs2redis

import cats.effect.IO
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.apply._
import cats.syntax.parallel._
import com.github.gvolpe.fs2redis.interpreter.connection.Fs2RedisClient
import com.github.gvolpe.fs2redis.interpreter.streams.Fs2Streaming
import com.github.gvolpe.fs2redis.model.StreamingMessage
import fs2.StreamApp.ExitCode
import fs2.{Stream, StreamApp}
import fs2.Stream

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

object Fs2StreamingDemo extends StreamApp[IO] {
object Fs2StreamingDemo extends IOApp {

import Demo._

Expand All @@ -44,16 +43,19 @@ object Fs2StreamingDemo extends StreamApp[IO] {
}
}

override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
def stream(args: List[String]): Stream[IO, Unit] =
for {
client <- Fs2RedisClient.stream[IO](redisURI)
streaming <- Fs2Streaming.mkStreamingConnection[IO, String, String](client, stringCodec, redisURI)
source = streaming.read(Set(streamKey1, streamKey2))
appender = streaming.append
rs <- Stream(
source.evalMap(x => putStrLn(x.toString)),
Stream.awakeEvery[IO](3.seconds) >> randomMessage.to(appender)
).join(2).drain
} yield rs
_ <- Stream(
source.evalMap(x => putStrLn(x.toString)),
Stream.awakeEvery[IO](3.seconds) >> randomMessage.to(appender)
).parJoin(2).drain
} yield ()

override def run(args: List[String]): IO[ExitCode] =
stream(args).compile.drain *> IO.pure(ExitCode.Success)

}
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import sbt._
object Dependencies {

object Versions {
val catsEffect = "1.0.0-RC2"
val fs2 = "1.0.0-M1"
val catsEffect = "1.0.0-RC3"
val fs2 = "1.0.0-M4"
val lettuce = "5.1.0.M1"
val scribe = "2.3.0"

Expand Down
4 changes: 3 additions & 1 deletion site/src/main/tut/effects/geo.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import cats.syntax.all._
import com.github.gvolpe.fs2redis.algebra.GeoCommands
import com.github.gvolpe.fs2redis.interpreter.Fs2Redis
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(ExecutionContext.global)
val commandsApi: Resource[IO, GeoCommands[IO, String, String]] = {
Fs2Redis[IO, String, String](null, null, null).map(_.asInstanceOf[GeoCommands[IO, String, String]])
Expand Down
4 changes: 3 additions & 1 deletion site/src/main/tut/effects/hashes.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import cats.syntax.all._
import com.github.gvolpe.fs2redis.algebra.HashCommands
import com.github.gvolpe.fs2redis.interpreter.Fs2Redis
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(ExecutionContext.global)
val commandsApi: Resource[IO, HashCommands[IO, String, String]] = {
Fs2Redis[IO, String, String](null, null, null).map(_.asInstanceOf[HashCommands[IO, String, String]])
Expand Down
4 changes: 3 additions & 1 deletion site/src/main/tut/effects/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import com.github.gvolpe.fs2redis.model.{DefaultRedisCodec, Fs2RedisCodec}
import io.lettuce.core.RedisURI
import io.lettuce.core.codec.{RedisCodec, StringCodec}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(ExecutionContext.global)
val redisURI: RedisURI = RedisURI.create("redis://localhost")
val stringCodec: Fs2RedisCodec[String, String] = DefaultRedisCodec(StringCodec.UTF8)
Expand Down
4 changes: 3 additions & 1 deletion site/src/main/tut/effects/lists.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import cats.syntax.all._
import com.github.gvolpe.fs2redis.algebra.ListCommands
import com.github.gvolpe.fs2redis.interpreter.Fs2Redis
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(ExecutionContext.global)
val commandsApi: Resource[IO, ListCommands[IO, String, String]] = {
Fs2Redis[IO, String, String](null, null, null).map(_.asInstanceOf[ListCommands[IO, String, String]])
Expand Down
4 changes: 3 additions & 1 deletion site/src/main/tut/effects/sets.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import cats.syntax.all._
import com.github.gvolpe.fs2redis.algebra.SetCommands
import com.github.gvolpe.fs2redis.interpreter.Fs2Redis
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(ExecutionContext.global)
val commandsApi: Resource[IO, SetCommands[IO, String, String]] = {
Fs2Redis[IO, String, String](null, null, null).map(_.asInstanceOf[SetCommands[IO, String, String]])
Expand Down
4 changes: 3 additions & 1 deletion site/src/main/tut/effects/sortedsets.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import cats.syntax.all._
import com.github.gvolpe.fs2redis.algebra.SortedSetCommands
import com.github.gvolpe.fs2redis.interpreter.Fs2Redis
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(ExecutionContext.global)
val commandsApi: Resource[IO, SortedSetCommands[IO, String, Long]] = {
Fs2Redis[IO, String, String](null, null, null).map(_.asInstanceOf[SortedSetCommands[IO, String, Long]])
Expand Down
4 changes: 3 additions & 1 deletion site/src/main/tut/effects/strings.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import cats.syntax.all._
import com.github.gvolpe.fs2redis.algebra.StringCommands
import com.github.gvolpe.fs2redis.interpreter.Fs2Redis
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
implicit val cs = IO.contextShift(ExecutionContext.global)
val commandsApi: Resource[IO, StringCommands[IO, String, String]] = {
Fs2Redis[IO, String, String](null, null, null).map(_.asInstanceOf[StringCommands[IO, String, String]])
Expand Down
20 changes: 11 additions & 9 deletions site/src/main/tut/streams/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,19 @@ When using the `Fs2PubSub` interpreter the `publish` function will be defined as
### PubSub example

```tut:book:silent
import cats.effect.IO
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.apply._
import com.github.gvolpe.fs2redis.interpreter.connection.Fs2RedisClient
import com.github.gvolpe.fs2redis.interpreter.pubsub.Fs2PubSub
import com.github.gvolpe.fs2redis.model.{DefaultChannel, DefaultRedisCodec}
import fs2.StreamApp.ExitCode
import fs2.{Sink, Stream, StreamApp}
import fs2.{Sink, Stream}
import io.lettuce.core.RedisURI
import io.lettuce.core.codec.StringCodec
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
object Fs2PubSubDemo extends StreamApp[IO] {
object Fs2PubSubDemo extends IOApp {
private val redisURI = RedisURI.create("redis://localhost")
private val stringCodec = DefaultRedisCodec(StringCodec.UTF8)
Expand All @@ -74,15 +73,15 @@ object Fs2PubSubDemo extends StreamApp[IO] {
def sink(name: String): Sink[IO, String] = _.evalMap(x => IO(println(s"Subscriber: $name >> $x")))
override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
def stream(args: List[String]): Stream[IO, Unit] =
for {
client <- Fs2RedisClient.stream[IO](redisURI)
pubSub <- Fs2PubSub.mkPubSubConnection[IO, String, String](client, stringCodec, redisURI)
sub1 = pubSub.subscribe(eventsChannel)
sub2 = pubSub.subscribe(gamesChannel)
pub1 = pubSub.publish(eventsChannel)
pub2 = pubSub.publish(gamesChannel)
rs <- Stream(
_ <- Stream(
sub1 to sink("#events"),
sub2 to sink("#games"),
Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)) to pub1,
Expand All @@ -91,8 +90,11 @@ object Fs2PubSubDemo extends StreamApp[IO] {
Stream.awakeEvery[IO](6.seconds) >> pubSub
.pubSubSubscriptions(List(eventsChannel, gamesChannel))
.evalMap(x => IO(println(x)))
).join(6).drain
} yield rs
).parJoin(6).drain
} yield ()
override def run(args: List[String]): IO[ExitCode] =
stream(args).compile.drain *> IO.pure(ExitCode.Success)
}
```
7 changes: 5 additions & 2 deletions site/src/main/tut/streams/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ import fs2.Stream
import io.lettuce.core.RedisURI
import io.lettuce.core.codec.StringCodec
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.Random
implicit val timer = IO.timer(ExecutionContext.global)
implicit val cs = IO.contextShift(ExecutionContext.global)
val redisURI = RedisURI.create("redis://localhost")
val stringCodec = DefaultRedisCodec(StringCodec.UTF8)
Expand All @@ -83,7 +86,7 @@ for {
rs <- Stream(
source.evalMap(x => putStrLn(x.toString)),
Stream.awakeEvery[IO](3.seconds) >> randomMessage.to(appender)
).join(2).drain
).parJoin(2).drain
} yield rs
```

0 comments on commit 1eada57

Please sign in to comment.