From afddb2800b4fc001da9c31857029874a1898f0ab Mon Sep 17 00:00:00 2001 From: Dmitriy Berdnikov Date: Mon, 5 Feb 2024 15:45:49 +0300 Subject: [PATCH 1/2] make switch extension for Flow inside Actors --- .../money/vivid/elmslie/core/store/Actor.kt | 21 +++++++++++++++++++ .../coroutines/timer/elm/TimerActor.kt | 21 +++++++++++-------- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt b/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt index c59efce6..b36a7957 100644 --- a/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt +++ b/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt @@ -2,10 +2,18 @@ package money.vivid.elmslie.core.store import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import money.vivid.elmslie.core.switcher.Switcher +import kotlin.reflect.KClass abstract class Actor { + protected val switchers = mutableMapOf, Switcher>() + private val mutex = Mutex() + /** * Executes a command. This method is performed on the [Dispatchers.IO] * [kotlinx.coroutines.Dispatchers.IO] which is set by ElmslieConfig.ioDispatchers() @@ -17,4 +25,17 @@ abstract class Actor { errorMapper: (error: Throwable) -> Event? = { null }, ) = mapNotNull { eventMapper(it) } .catch { errorMapper(it)?.let { event -> emit(event) } ?: throw it } + + protected fun Flow.switchOnEach(command: Command, delayMillis: Long = 0): Flow { + return flow { + val switcher = mutex.withLock { + switchers.getOrPut(command::class) { + Switcher() + } + } + switcher.switch(delayMillis) { this@switchOnEach }.collect { + emit(it) + } + } + } } diff --git a/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt b/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt index ce9c076a..fbb4ce12 100644 --- a/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt +++ b/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt @@ -1,6 +1,7 @@ package money.vivid.elmslie.samples.coroutines.timer.elm import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import money.vivid.elmslie.core.store.Actor import money.vivid.elmslie.core.switcher.Switcher @@ -11,18 +12,20 @@ internal object TimerActor : Actor() { override fun execute(command: Command) = when (command) { - is Command.Start -> - switcher - .switch { secondsFlow() } - .mapEvents( - eventMapper = { Event.OnTimeTick }, - errorMapper = { Event.OnTimeError(it) }, - ) - is Command.Stop -> switcher.cancel() + is Command.Start -> secondsFlow() + .switchOnEach(command) + .mapEvents( + eventMapper = { Event.OnTimeTick }, + errorMapper = { Event.OnTimeError(it) }, + ) + + is Command.Stop -> switchers.getOrPut(Command.Start::class) { + Switcher() + }.cancel() } @Suppress("MagicNumber") - private fun secondsFlow() = flow { + private fun secondsFlow(): Flow = flow { repeat(10) { delay(1000) emit(it) From f1cdc158fc09e02c07d6ea8293f2ae0895b63bad Mon Sep 17 00:00:00 2001 From: Dmitriy Berdnikov Date: Mon, 12 Feb 2024 11:52:22 +0300 Subject: [PATCH 2/2] cancel flow method; rename extension --- .../kotlin/money/vivid/elmslie/core/store/Actor.kt | 11 ++++++++--- .../samples/coroutines/timer/elm/TimerActor.kt | 9 ++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt b/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt index b36a7957..62db5663 100644 --- a/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt +++ b/elmslie-core/src/main/kotlin/money/vivid/elmslie/core/store/Actor.kt @@ -2,6 +2,7 @@ package money.vivid.elmslie.core.store import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.sync.Mutex @@ -11,7 +12,7 @@ import kotlin.reflect.KClass abstract class Actor { - protected val switchers = mutableMapOf, Switcher>() + private val switchers = mutableMapOf, Switcher>() private val mutex = Mutex() /** @@ -26,16 +27,20 @@ abstract class Actor { ) = mapNotNull { eventMapper(it) } .catch { errorMapper(it)?.let { event -> emit(event) } ?: throw it } - protected fun Flow.switchOnEach(command: Command, delayMillis: Long = 0): Flow { + protected fun Flow.asSwitchFlow(command: Command, delayMillis: Long = 0): Flow { return flow { val switcher = mutex.withLock { switchers.getOrPut(command::class) { Switcher() } } - switcher.switch(delayMillis) { this@switchOnEach }.collect { + switcher.switch(delayMillis) { this@asSwitchFlow }.collect { emit(it) } } } + + protected fun cancelSwitchFlow(command: KClass): Flow { + return switchers[command]?.cancel() ?: emptyFlow() + } } diff --git a/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt b/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt index fbb4ce12..440b6021 100644 --- a/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt +++ b/samples/coroutines-loader/src/main/kotlin/money/vivid/elmslie/samples/coroutines/timer/elm/TimerActor.kt @@ -4,24 +4,19 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import money.vivid.elmslie.core.store.Actor -import money.vivid.elmslie.core.switcher.Switcher internal object TimerActor : Actor() { - private val switcher = Switcher() - override fun execute(command: Command) = when (command) { is Command.Start -> secondsFlow() - .switchOnEach(command) + .asSwitchFlow(command) .mapEvents( eventMapper = { Event.OnTimeTick }, errorMapper = { Event.OnTimeError(it) }, ) - is Command.Stop -> switchers.getOrPut(Command.Start::class) { - Switcher() - }.cancel() + is Command.Stop -> cancelSwitchFlow(Command.Start::class) } @Suppress("MagicNumber")