diff --git a/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt b/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt index c59efce6..62db5663 100644 --- a/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt +++ b/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt @@ -2,10 +2,19 @@ package money.vivid.elmslie.core.store import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import money.vivid.elmslie.core.switcher.Switcher +import kotlin.reflect.KClass abstract class Actor { + private val switchers = mutableMapOf, Switcher>() + private val mutex = Mutex() + /** * Executes a command. This method is performed on the [Dispatchers.IO] * [kotlinx.coroutines.Dispatchers.IO] which is set by ElmslieConfig.ioDispatchers() @@ -17,4 +26,21 @@ abstract class Actor { errorMapper: (error: Throwable) -> Event? = { null }, ) = mapNotNull { eventMapper(it) } .catch { errorMapper(it)?.let { event -> emit(event) } ?: throw it } + + protected fun Flow.asSwitchFlow(command: Command, delayMillis: Long = 0): Flow { + return flow { + val switcher = mutex.withLock { + switchers.getOrPut(command::class) { + Switcher() + } + } + switcher.switch(delayMillis) { this@asSwitchFlow }.collect { + emit(it) + } + } + } + + protected fun cancelSwitchFlow(command: KClass): Flow { + return switchers[command]?.cancel() ?: emptyFlow() + } } diff --git a/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt b/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt index ce9c076a..440b6021 100644 --- a/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt +++ b/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt @@ -1,28 +1,26 @@ package money.vivid.elmslie.samples.coroutines.timer.elm import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import money.vivid.elmslie.core.store.Actor -import money.vivid.elmslie.core.switcher.Switcher internal object TimerActor : Actor() { - private val switcher = Switcher() - override fun execute(command: Command) = when (command) { - is Command.Start -> - switcher - .switch { secondsFlow() } - .mapEvents( - eventMapper = { Event.OnTimeTick }, - errorMapper = { Event.OnTimeError(it) }, - ) - is Command.Stop -> switcher.cancel() + is Command.Start -> secondsFlow() + .asSwitchFlow(command) + .mapEvents( + eventMapper = { Event.OnTimeTick }, + errorMapper = { Event.OnTimeError(it) }, + ) + + is Command.Stop -> cancelSwitchFlow(Command.Start::class) } @Suppress("MagicNumber") - private fun secondsFlow() = flow { + private fun secondsFlow(): Flow = flow { repeat(10) { delay(1000) emit(it)