Skip to content

Commit

Permalink
make switch extension for Flow inside Actors
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Berdnikov committed Feb 5, 2024
1 parent fec3f6f commit 85d2ab1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package money.vivid.elmslie.core.store
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.mapNotNull
import money.vivid.elmslie.core.switcher.Switcher
import kotlin.reflect.KClass

abstract class Actor<Command : Any, Event : Any> {

private val switchers = mutableMapOf<KClass<out Any>, Switcher>()

/**
* Executes a command. This method is performed on the [Dispatchers.IO]
* [kotlinx.coroutines.Dispatchers.IO] which is set by ElmslieConfig.ioDispatchers()
Expand All @@ -17,4 +21,24 @@ abstract class Actor<Command : Any, Event : Any> {
errorMapper: (error: Throwable) -> Event? = { null },
) = mapNotNull { eventMapper(it) }
.catch { errorMapper(it)?.let { event -> emit(event) } ?: throw it }

protected fun <T : Any> switchFlow(
command: Command,
delayMillis: Long = 0,
flow: () -> Flow<T>
): Flow<T> {
val switcher = switchers.getOrPut(command::class) {
Switcher()
}
return switcher.switch(delayMillis) {
flow.invoke()
}
}

protected fun <T : Any, Command : Any> Flow<T>.switchOnEach(command: Command, delayMillis: Long = 0): Flow<T> {
val switcher = switchers.getOrPut(command::class) {
Switcher()
}
return switcher.switch(delayMillis) { this }
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package money.vivid.elmslie.samples.coroutines.timer.elm

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import money.vivid.elmslie.core.store.Actor
import money.vivid.elmslie.core.switcher.Switcher
Expand All @@ -11,18 +12,24 @@ internal object TimerActor : Actor<Command, Event>() {

override fun execute(command: Command) =
when (command) {
is Command.Start ->
switcher
.switch { secondsFlow() }
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)
// is Command.Start -> switchFlow(command) {
// secondsFlow()
// }.mapEvents(
// eventMapper = { Event.OnTimeTick },
// errorMapper = { Event.OnTimeError(it) },
// )
is Command.Start -> secondsFlow()
.switchOnEach(command)
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)

is Command.Stop -> switcher.cancel()
}

@Suppress("MagicNumber")
private fun secondsFlow() = flow {
private fun secondsFlow(): Flow<Int> = flow {
repeat(10) {
delay(1000)
emit(it)
Expand Down

0 comments on commit 85d2ab1

Please sign in to comment.