diff --git a/kotlin/rxjava_to_flow/Streams.kt b/kotlin/rxjava_to_flow/Streams.kt index f48ff48..16a0d69 100644 --- a/kotlin/rxjava_to_flow/Streams.kt +++ b/kotlin/rxjava_to_flow/Streams.kt @@ -1,19 +1,19 @@ package co.gitar -import io.reactivex.rxjava3.core.Observable - object Streams { - fun stream1(): Observable { - return Observable.just("a", "b", "c", "d", "e", "f") + fun stream1(): Flow { + return flowOf("a", "b", "c", "d", "e", "f") } - fun stream2(): Observable { - return Observable.fromIterable(listOf("x", "y", "z")) + fun stream2(): Flow { + return flowOf("x", "y", "z") } - fun stream3(): Observable { - return Observable.fromCallable { call() } - .delay(10, TimeUnit.MILLISECONDS) + fun stream3(): Flow { + return flow { + delay(10) + emit(call()) + } } private fun call(): String { diff --git a/kotlin/rxjava_to_flow/Test.kt b/kotlin/rxjava_to_flow/Test.kt index 598fe91..e320ed1 100644 --- a/kotlin/rxjava_to_flow/Test.kt +++ b/kotlin/rxjava_to_flow/Test.kt @@ -1,7 +1,5 @@ package co.gitar -import co.gitar.Streams -import io.reactivex.rxjava3.core.Observable import kotlin.test.Test import org.junit.jupiter.api.Assertions.assertEquals @@ -9,47 +7,52 @@ class Test { @Test fun testSimple() { - val observable = Observable.just("Hello World") + val observable = flowOf("Hello World") val res = mutableListOf() - observable.subscribe { res += it } + runBlocking { observable.collect { res += it } } assertEquals(listOf("Hello World"), res) } @Test fun testMBasic() { - Streams.stream1().map { it.first().code } - .filter { it > 98 } - .take(3) - .subscribe { println(it) } + runBlocking { + Streams.stream1() + .map { it.first().code } + .filter { it > 98 } + .take(3) + .collect { println(it) } + } } @Test fun testZipWith() { val res = mutableListOf() - Streams.stream1().zipWith(Streams.stream2(), Streams::concat).subscribe { res += it } + runBlocking { + Streams.stream1().zip(Streams.stream2(), Streams::concat).collect { res += it } + } assertEquals(listOf("ax", "by", "cz"), res) } @Test fun testStartWith() { val res = mutableListOf() - Streams.stream1() - .startWith(Streams.stream2()) - .subscribe { res += it } + runBlocking { + Streams.stream1().onStart { emitAll(Streams.stream2()) }.collect { res += it } + } assertEquals(listOf("x", "y", "z", "a", "b", "c", "d", "e", "f"), res) } @Test fun testMergeWith() { val res = mutableListOf() - Streams.stream1().mergeWith(Streams.stream2()).subscribe { res += it } + runBlocking { merge(Streams.stream1(), Streams.stream2()).collect { res += it } } assertEquals(listOf("a", "b", "c", "d", "e", "f", "x", "y", "z"), res) } @Test fun testFlatMap() { val res = mutableListOf() - Streams.stream1().flatMap { Streams.stream2() }.subscribe { res += it } + runBlocking { Streams.stream1().flatMapConcat { Streams.stream2() }.collect { res += it } } assertEquals( listOf( "x",