This guide explains the key differences between Kotlin coroutines and reactive streams and shows how they can be used together for the greater good. Prior familiarity with the basic coroutine concepts that are covered in Guide to kotlinx.coroutines is not required, but is a big plus. If you are familiar with reactive streams, you may find this guide a better introduction into the world of coroutines.
There are several modules in kotlinx.coroutines
project that are related to reactive streams:
- kotlinx-coroutines-reactive -- utilities for Reactive Streams
- kotlinx-coroutines-reactor -- utilities for Reactor
- kotlinx-coroutines-rx2 -- utilities for RxJava 2.x
This guide is mostly based on Reactive Streams specification and uses
its Publisher
interface with some examples based on RxJava 2.x,
which implements reactive streams specification.
You are welcome to clone
kotlinx.coroutines
project
from GitHub to your workstation in order to
run all the presented examples. They are contained in
reactive/kotlinx-coroutines-rx2/test/guide
directory of the project.
This section outlines key differences between reactive streams and coroutine-based channels.
The Channel is somewhat similar concept to the following reactive stream classes:
- Reactive stream Publisher;
- Rx Java 1.x Observable;
- Rx Java 2.x Flowable, which implements
Publisher
.
They all describe an asynchronous stream of elements (aka items in Rx), either infinite or finite, and all of them support backpressure.
However, the Channel
always represents a hot stream of items, using Rx terminology. Elements are being sent
into the channel by producer coroutines and are received from it by consumer coroutines.
Every receive invocation consumes an element from the channel.
Let us illustrate it with the following example:
fun main() = runBlocking<Unit> {
// create a channel that produces numbers from 1 to 3 with 200ms delays between them
val source = produce<Int> {
println("Begin") // mark the beginning of this coroutine in output
for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
// print elements from the source
println("Elements:")
source.consumeEach { // consume elements from it
println(it)
}
// print elements from the source AGAIN
println("Again:")
source.consumeEach { // consume elements from it
println(it)
}
}
You can get full code here.
This code produces the following output:
Elements:
Begin
1
2
3
Again:
Notice how the "Begin" line was printed just once, because the produce coroutine builder, when it is executed, launches one coroutine to produce a stream of elements. All the produced elements are consumed with ReceiveChannel.consumeEach extension function. There is no way to receive the elements from this channel again. The channel is closed when the producer coroutine is over and an attempt to receive from it again cannot receive anything.
Let us rewrite this code using the publish coroutine builder from kotlinx-coroutines-reactive
module
instead of produce from kotlinx-coroutines-core
module. The code stays the same,
but where source
used to have the ReceiveChannel type, it now has the reactive streams'
Publisher
type, and where consumeEach was used to consume elements from the channel,
now collect is used to collect elements from the publisher.
fun main() = runBlocking<Unit> {
// create a publisher that produces numbers from 1 to 3 with 200ms delays between them
val source = publish<Int> {
// ^^^^^^^ <--- Difference from the previous examples is here
println("Begin") // mark the beginning of this coroutine in output
for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
// print elements from the source
println("Elements:")
source.collect { // collect elements from it
println(it)
}
// print elements from the source AGAIN
println("Again:")
source.collect { // collect elements from it
println(it)
}
}
You can get full code here.
Now the output of this code changes to:
Elements:
Begin
1
2
3
Again:
Begin
1
2
3
This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order
functional concept. While the channel is a stream of elements, the reactive stream defines a recipe on how the stream of
elements is produced. It becomes the actual stream of elements when collected. Each collector may receive the same or
a different stream of elements, depending on how the corresponding implementation of Publisher
works.
The publish coroutine builder used in the above example does not launch a coroutine, but every collect invocation does. There are two of them here and that is why we see "Begin" printed twice.
In Rx lingo, this kind of publisher is called cold. Many standard Rx operators produce cold streams, too. We can collect them from a coroutine, and every collector gets the same stream of elements.
Note that we can replicate the same behaviour that we saw with channels by using Rx publish operator and connect method with it.
In the second example from the previous section, source.collect { ... }
was used to collect all elements.
Instead, we can open a channel using openSubscription
and iterate over it. In this way, we can have finer-grained control over our iteration
(using break
, for example), as shown below:
fun main() = runBlocking<Unit> {
val source = Flowable.range(1, 5) // a range of five numbers
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doOnComplete { println("OnComplete") } // ...
.doFinally { println("Finally") } // ... into what's going on
var cnt = 0
source.openSubscription().consume { // open channel to the source
for (x in this) { // iterate over the channel to receive elements from it
println(x)
if (++cnt >= 3) break // break when 3 elements are printed
}
// Note: `consume` cancels the channel when this block of code is complete
}
}
You can get full code here.
It produces the following output:
OnSubscribe
1
2
3
Finally
With an explicit openSubscription
we should cancel the corresponding
subscription to unsubscribe from the source, but there is no need to call cancel
explicitly --
consume does that for us under the hood.
The installed
doFinally
listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete"
is never printed because we did not consume all of the elements.
We do not need to use an explicit cancel
either if we collect
all the elements:
fun main() = runBlocking<Unit> {
val source = Flowable.range(1, 5) // a range of five numbers
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doOnComplete { println("OnComplete") } // ...
.doFinally { println("Finally") } // ... into what's going on
// collect the source fully
source.collect { println(it) }
}
You can get full code here.
We get the following output:
OnSubscribe
1
2
3
OnComplete
Finally
4
5
Notice how "OnComplete" and "Finally" are printed before the lasts elements "4" and "5".
It happens because our main
function in this
example is a coroutine that we start with the runBlocking coroutine builder.
Our main coroutine receives on the flowable using the source.collect { ... }
expression.
The main coroutine is suspended while it waits for the source to emit an item.
When the last items are emitted by Flowable.range(1, 5)
it
resumes the main coroutine, which gets dispatched onto the main thread to print this
last element at a later point in time, while the source completes and prints "Finally".
Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can suspend and they provide a natural answer to handling backpressure.
In Rx Java 2.x, the backpressure-capable class is called
Flowable.
In the following example, we use rxFlowable coroutine builder from kotlinx-coroutines-rx2
module to define a
flowable that sends three integers from 1 to 3.
It prints a message to the output before invocation of the
suspending send function, so that we can study how it operates.
The integers are generated in the context of the main thread, but the subscription is shifted
to another thread using Rx
observeOn
operator with a buffer of size 1.
The subscriber is slow. It takes 500 ms to process each item, which is simulated using Thread.sleep
.
fun main() = runBlocking<Unit> {
// coroutine -- fast producer of elements in the context of the main thread
val source = rxFlowable {
for (x in 1..3) {
send(x) // this is a suspending function
println("Sent $x") // print after successfully sent item
}
}
// subscribe on another thread with a slow subscriber using Rx
source
.observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
.doOnComplete { println("Complete") }
.subscribe { x ->
Thread.sleep(500) // 500ms to process each item
println("Processed $x")
}
delay(2000) // suspend the main thread for a few seconds
}
You can get full code here.
The output of this code nicely illustrates how backpressure works with coroutines:
Sent 1
Processed 1
Sent 2
Processed 2
Sent 3
Processed 3
Complete
We see here how the producer coroutine puts the first element in the buffer and is suspended while trying to send another one. Only after the consumer processes the first item, the producer sends the second one and resumes, etc.
RxJava has a concept of Subject which is an object that effectively broadcasts elements to all its subscribers. The matching concept in the coroutines world is called a BroadcastChannel. There is a variety of subjects in Rx with BehaviorSubject being the one used to manage state:
fun main() {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost
// now subscribe to this subject and print everything
subject.subscribe(System.out::println)
subject.onNext("three")
subject.onNext("four")
}
You can get full code here.
This code prints the current state of the subject on subscription and all its further updates:
two
three
four
You can subscribe to subjects from a coroutine just as with any other reactive stream:
fun main() = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print everything
GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
}
You can get full code here.
The result is the same:
two
three
four
Here we use the Dispatchers.Unconfined coroutine context to launch a consuming coroutine with the same behavior as subscription in Rx. It basically means that the launched coroutine is going to be immediately executed in the same thread that is emitting elements. Contexts are covered in more details in a separate section.
The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates. A typical UI application does not need to react to every state change. Only the most recent state is relevant. A sequence of back-to-back updates to the application state needs to get reflected in UI only once, as soon as the UI thread is free. For the following example we are going to simulate this by launching a consuming coroutine in the context of the main thread and use the yield function to simulate a break in the sequence of updates and to release the main thread:
fun main() = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print the most recent update
launch { // use the context of the main thread for a coroutine
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
yield() // yield the main thread to the launched coroutine <--- HERE
subject.onComplete() // now complete the subject's sequence to cancel the consumer, too
}
You can get full code here.
Now the coroutine processes (prints) only the most recent update:
four
The corresponding behavior in the pure coroutines world is implemented by ConflatedBroadcastChannel that provides the same logic on top of coroutine channels directly, without going through the bridge to the reactive streams:
fun main() = runBlocking<Unit> {
val broadcast = ConflatedBroadcastChannel<String>()
broadcast.offer("one")
broadcast.offer("two")
// now launch a coroutine to print the most recent update
launch { // use the context of the main thread for a coroutine
broadcast.consumeEach { println(it) }
}
broadcast.offer("three")
broadcast.offer("four")
yield() // yield the main thread to the launched coroutine
broadcast.close() // now close the broadcast channel to cancel the consumer, too
}
You can get full code here.
It produces the same output as the previous example based on BehaviorSubject
:
four
Another implementation of BroadcastChannel is ArrayBroadcastChannel
with an array-based buffer of
a specified capacity
. It can be created with BroadcastChannel(capacity)
.
It delivers every event to every
subscriber as soon as their corresponding subscriptions are opened. It corresponds to
PublishSubject in Rx.
The capacity of the buffer in the constructor of ArrayBroadcastChannel
controls the numbers of elements
that can be sent before the sender is suspended waiting for a receiver to receive those elements.
Full-featured reactive stream libraries, like Rx, come with a very large set of operators to create, transform, combine and otherwise process the corresponding streams. Creating your own operators with support for back-pressure is notoriously difficult.
Coroutines and channels are designed to provide an opposite experience. There are no built-in operators, but processing streams of elements is extremely simple and back-pressure is supported automatically without you having to explicitly think about it.
This section shows a coroutine-based implementation of several reactive stream operators.
Let's roll out own implementation of
range
operator for the reactive streams' Publisher
interface. The asynchronous clean-slate implementation of this operator for
reactive streams is explained in
this blog post.
It takes a lot of code.
Here is the corresponding code with coroutines:
fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) send(x)
}
Here, CoroutineScope
and context
are used instead of an Executor
and all the backpressure aspects are taken care
of by the coroutines machinery. Note that this implementation depends only on the small reactive streams library
that defines the Publisher
interface and its friends.
Using it from a coroutine is straightforward:
fun main() = runBlocking<Unit> {
// Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default
range(Dispatchers.Default, 1, 5).collect { println(it) }
}
You can get full code here.
The result of this code is quite expected:
1
2
3
4
5
Reactive operators like
filter and
map
are trivial to implement with coroutines. For a bit of challenge and showcase, let us combine them
into the single fusedFilterMap
operator:
fun <T, R> Publisher<T>.fusedFilterMap(
context: CoroutineContext, // the context to execute this coroutine in
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
) = publish<R>(context) {
collect { // collect the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
}
}
Using range
from the previous example we can test our fusedFilterMap
by filtering for even numbers and mapping them to strings:
fun main() = runBlocking<Unit> {
range(1, 5)
.fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
.collect { println(it) } // print all the resulting strings
}
You can get full code here.
It is not hard to see that the result is going to be:
2 is even
4 is even
Let's implement our own version of takeUntil operator. It is quite tricky as subscriptions to two streams need to be tracked and managed. We need to relay all the elements from the source stream until the other stream either completes or emits anything. However, we have the select expression to rescue us in the coroutines implementation:
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
val current = this
other.openSubscription().consume { // explicitly open channel to Publisher<U>
val other = this
whileSelect {
other.onReceive { false } // bail out on any received element from `other`
current.onReceive { send(it); true } // resend element from this channel and continue
}
}
}
}
This code is using whileSelect as a nicer shortcut to while(select{...}) {}
loop and Kotlin's
consume expressions to close the channels on exit, which unsubscribes from the corresponding publishers.
The following hand-written combination of
range with
interval
is used for testing. It is coded using a publish
coroutine builder
(its pure-Rx implementation is shown in later sections):
fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
}
}
The following code shows how takeUntil
works:
fun main() = runBlocking<Unit> {
val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval
val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms
slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it
}
You can get full code here.
Producing
1
2
There are always at least two ways for processing multiple streams of data with coroutines. One way involving select was shown in the previous example. The other way is just to launch multiple coroutines. Let us implement merge operator using the latter approach:
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
collect { pub -> // for each publisher collected
launch { // launch a child coroutine
pub.collect { send(it) } // resend all element from this publisher
}
}
}
Notice that all the coroutines that are
being launched here are the children of the publish
coroutine and will get cancelled when the publish
coroutine is cancelled or is otherwise completed.
Moreover, since the parent coroutine waits until all the children are complete, this implementation fully
merges all the received streams.
For a test, let us start with the rangeWithInterval
function from the previous example and write a
producer that sends its results twice with some delay:
fun CoroutineScope.testPub() = publish<Publisher<Int>> {
send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
delay(100) // wait for 100 ms
send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
delay(1100) // wait for 1.1s - done in 1.2 sec after start
}
The test code is to use merge
on testPub
and to display the results:
fun main() = runBlocking<Unit> {
testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream
}
You can get full code here.
And the results should be:
1
2
11
3
4
12
13
All the example operators that are shown in the previous section have an explicit CoroutineContext parameter. In the Rx world it roughly corresponds to a Scheduler.
The following example shows the basics of threading context management with Rx.
Here rangeWithIntervalRx
is an implementation of rangeWithInterval
function using Rx
zip
, range
, and interval
operators.
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
You can get full code here.
We are explicitly passing the
Schedulers.computation()
scheduler to our rangeWithIntervalRx
operator and
it is going to be executed in Rx computation thread pool. The output is going to be similar to the following one:
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
In the world of coroutines Schedulers.computation()
roughly corresponds to Dispatchers.Default,
so the previous example is similar to the following one:
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
}
}
fun main() {
Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
You can get full code here.
The produced output is going to be similar to:
1 on thread ForkJoinPool.commonPool-worker-1
2 on thread ForkJoinPool.commonPool-worker-1
3 on thread ForkJoinPool.commonPool-worker-1
Here we've used Rx subscribe operator that does not have its own scheduler and operates on the same thread that the publisher -- on a default shared pool of threads in this example.
In Rx you use special operators to modify the threading context for operations in the chain. You can find some good guides about them, if you are not familiar.
For example, there is
observeOn
operator. Let us modify the previous example to observe using Schedulers.computation()
:
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
}
}
fun main() {
Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
.observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED
.subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
You can get full code here.
Here is the difference in output, notice "RxComputationThreadPool":
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
A coroutine is always working in some context. For example, let us start a coroutine
in the main thread with runBlocking and iterate over the result of the Rx version of rangeWithIntervalRx
operator,
instead of using Rx subscribe
operator:
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() = runBlocking<Unit> {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
You can get full code here.
The resulting messages are going to be printed in the main thread:
1 on thread main
2 on thread main
3 on thread main
Most Rx operators do not have any specific thread (scheduler) associated with them and are working
in whatever thread they happen to be invoked. We've seen it in the example with the subscribe
operator
in the threads with Rx section.
In the world of coroutines, Dispatchers.Unconfined context serves a similar role. Let us modify our previous example,
but instead of iterating over the source Flowable
from the runBlocking
coroutine that is confined
to the main thread, we launch a new coroutine in the Dispatchers.Unconfined
context, while the main coroutine
simply waits for its completion using Job.join:
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
Flowable.zip(
Flowable.range(start, count),
Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
BiFunction { x, _ -> x })
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool)
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // wait for our coroutine to complete
}
You can get full code here.
Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just
like our initial example using the Rx subscribe
operator.
1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1
Note that the Dispatchers.Unconfined context should be used with care. It may improve the overall performance on certain tests, due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks and makes it harder to reason about asynchronicity of the code that is using it.
If a coroutine sends an element to a channel, then the thread that invoked the
send may start executing the code of the coroutine with the Dispatchers.Unconfined dispatcher.
The original producer coroutine that invoked send
is paused until the unconfined consumer coroutine hits its next
suspension point. This is very similar to a lock-step single-threaded onNext
execution in the Rx world in the absense
of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks
of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines,
where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing
coroutines for complex pipelines with fan-in and fan-out between multiple worker coroutines.