A reactive streams implementation for fs2
Add the following to your build.sbt
libraryDependencies += "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.8.0"
This is dependent on version 1.0.0-M5
of fs2.
import cats._, cats.effect._, fs2._
// import cats._
// import cats.effect._
// import fs2._
import fs2.interop.reactivestreams._
// import fs2.interop.reactivestreams._
import scala.concurrent.ExecutionContext
// import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
// contextShift: cats.effect.ContextShift[cats.effect.IO] = cats.effect.internals.IOContextShift@404c8582
val upstream = Stream(1, 2, 3).covary[IO]
// upstream: fs2.Stream[cats.effect.IO,Int] = Stream(..)
val publisher = upstream.toUnicastPublisher
// publisher: fs2.interop.reactivestreams.StreamUnicastPublisher[cats.effect.IO,Int] = fs2.interop.reactivestreams.StreamUnicastPublisher@713e59c9
val downstream = publisher.toStream[IO]
// downstream: fs2.Stream[cats.effect.IO,Int] = Stream(..)
// res1: Vector[Int] = Vector(1, 2, 3)
The reactive streams initiative is complicated, mutable and unsafe - it is not something that is desired for use over fs2. But there are times when we need use fs2 in conjunction with a different streaming library, and this is where reactive streams shines.
Any reactive streams system can interop with any other reactive streams system by exposing an org.reactivestreams.Publisher
or an org.reactivestreams.Subscriber
This library provides instances of reactivestreams compliant publishers and subscribers to ease interop with other streaming libraries.
You may require the following imports
import cats._, cats.effect._, fs2._
import fs2.interop.reactivestreams._
import scala.concurrent.ExecutionContext
A ContextShift
instance is necessary when working with IO
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
To convert a Stream
into a downstream unicast org.reactivestreams.Publisher
val stream = Stream(1, 2, 3).covary[IO]
To convert an upstream org.reactivestreams.Publisher
into a Stream
val publisher: org.reactivestreams.Publisher[Int] = Stream(1, 2, 3).covary[IO].toUnicastPublisher
A unicast publisher must have a single subscriber only.
Import the Akka streams dsl:
import akka._
import akka.stream._
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import cats.effect._
import fs2.Stream
import fs2.interop.reactivestreams._
import scala.concurrent.ExecutionContext
implicit val system: ActorSystem = ActorSystem("akka-streams-example")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
To convert from a Source
to a Stream
val source = Source(1 to 5)
// source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = Source(SourceShape(StatefulMapConcat.out(359203115)))
val publisher = source.runWith(Sink.asPublisher[Int](fanout = false))
// publisher: org.reactivestreams.Publisher[Int] = VirtualProcessor(state = Publisher[StatefulMapConcat.out(359203115)])
val stream = publisher.toStream[IO]
// stream: fs2.Stream[cats.effect.IO,Int] = Stream(..)
// res0: Vector[Int] = Vector(1, 2, 3, 4, 5)
To convert from a Stream
to a Source
import scala.concurrent.ExecutionContext.global
// import scala.concurrent.ExecutionContext.global
val stream = Stream.emits((1 to 5).toSeq).covary[IO]
// stream: fs2.Stream[cats.effect.IO,Int] = Stream(..)
val source = Source.fromPublisher(stream.toUnicastPublisher)
// source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = Source(SourceShape(PublisherSource.out(491734460)))
// res1: scala.collection.immutable.Seq[Int] = Vector(1, 2, 3, 4, 5)
Patch releases (e.g 0.2.7
to 0.2.8
) are binary compatible. If you're concerned about a broken release, please check the CHANGELOG for more details.
fs2 | fs2-reactive-streams | status |
1.0.0-M5 | 0.8.0 | current |
1.0.0-M4 | 0.7.0 | current |
1.0.0-M1 | 0.6.0 | current |
0.10.1 | 0.5.1 | current |
0.10.0 | 0.5.0 | current |
0.9.4 | 0.1.1 | current |
The following people have taken their time and effort to improve fs2-reactive-streams.
- Ross A Baker @rossabaker
- Bjørn Madsen @aeons
- Fabio Labella @SystemFw
Thank you for your help!
fs2-reactive-streams is licensed under the Apache License 2.0.
Many thanks go to Ross Baker who took the first step in making a reactive streams implementation in http4s. Without this, fs2-reactive-streams would have been much harder to write.