Skip to content

Commit

Permalink
Merge pull request #255 from vivid-money/feature/actor-switch
Browse files Browse the repository at this point in the history
Make switch extension for Flow inside Actors
  • Loading branch information
rcmkt authored Feb 19, 2024
2 parents 6830973 + f1cdc15 commit da0e737
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@ package money.vivid.elmslie.core.store

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import money.vivid.elmslie.core.switcher.Switcher
import kotlin.reflect.KClass

abstract class Actor<Command : Any, Event : Any> {

private val switchers = mutableMapOf<KClass<out Any>, Switcher>()
private val mutex = Mutex()

/**
* Executes a command. This method is performed on the [Dispatchers.IO]
* [kotlinx.coroutines.Dispatchers.IO] which is set by ElmslieConfig.ioDispatchers()
Expand All @@ -17,4 +26,21 @@ abstract class Actor<Command : Any, Event : Any> {
errorMapper: (error: Throwable) -> Event? = { null },
) = mapNotNull { eventMapper(it) }
.catch { errorMapper(it)?.let { event -> emit(event) } ?: throw it }

protected fun <T : Any, Command : Any> Flow<T>.asSwitchFlow(command: Command, delayMillis: Long = 0): Flow<T> {
return flow {
val switcher = mutex.withLock {
switchers.getOrPut(command::class) {
Switcher()
}
}
switcher.switch(delayMillis) { this@asSwitchFlow }.collect {
emit(it)
}
}
}

protected fun <T : Any> cancelSwitchFlow(command: KClass<out Any>): Flow<T> {
return switchers[command]?.cancel() ?: emptyFlow()
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
package money.vivid.elmslie.samples.coroutines.timer.elm

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import money.vivid.elmslie.core.store.Actor
import money.vivid.elmslie.core.switcher.Switcher

internal object TimerActor : Actor<Command, Event>() {

private val switcher = Switcher()

override fun execute(command: Command) =
when (command) {
is Command.Start ->
switcher
.switch { secondsFlow() }
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)
is Command.Stop -> switcher.cancel()
is Command.Start -> secondsFlow()
.asSwitchFlow(command)
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)

is Command.Stop -> cancelSwitchFlow(Command.Start::class)
}

@Suppress("MagicNumber")
private fun secondsFlow() = flow {
private fun secondsFlow(): Flow<Int> = flow {
repeat(10) {
delay(1000)
emit(it)
Expand Down

0 comments on commit da0e737

Please sign in to comment.