Skip to content

Commit

Permalink
make switch extension for Flow inside Actors
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Berdnikov committed Feb 12, 2024
1 parent fec3f6f commit afddb28
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ package money.vivid.elmslie.core.store

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import money.vivid.elmslie.core.switcher.Switcher
import kotlin.reflect.KClass

abstract class Actor<Command : Any, Event : Any> {

protected val switchers = mutableMapOf<KClass<out Any>, Switcher>()
private val mutex = Mutex()

/**
* Executes a command. This method is performed on the [Dispatchers.IO]
* [kotlinx.coroutines.Dispatchers.IO] which is set by ElmslieConfig.ioDispatchers()
Expand All @@ -17,4 +25,17 @@ abstract class Actor<Command : Any, Event : Any> {
errorMapper: (error: Throwable) -> Event? = { null },
) = mapNotNull { eventMapper(it) }
.catch { errorMapper(it)?.let { event -> emit(event) } ?: throw it }

protected fun <T : Any, Command : Any> Flow<T>.switchOnEach(command: Command, delayMillis: Long = 0): Flow<T> {
return flow {
val switcher = mutex.withLock {
switchers.getOrPut(command::class) {
Switcher()
}
}
switcher.switch(delayMillis) { this@switchOnEach }.collect {
emit(it)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package money.vivid.elmslie.samples.coroutines.timer.elm

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import money.vivid.elmslie.core.store.Actor
import money.vivid.elmslie.core.switcher.Switcher
Expand All @@ -11,18 +12,20 @@ internal object TimerActor : Actor<Command, Event>() {

override fun execute(command: Command) =
when (command) {
is Command.Start ->
switcher
.switch { secondsFlow() }
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)
is Command.Stop -> switcher.cancel()
is Command.Start -> secondsFlow()
.switchOnEach(command)
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)

is Command.Stop -> switchers.getOrPut(Command.Start::class) {
Switcher()
}.cancel()
}

@Suppress("MagicNumber")
private fun secondsFlow() = flow {
private fun secondsFlow(): Flow<Int> = flow {
repeat(10) {
delay(1000)
emit(it)
Expand Down

0 comments on commit afddb28

Please sign in to comment.