Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make switch extension for Flow inside Actors #255

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@ package money.vivid.elmslie.core.store

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import money.vivid.elmslie.core.switcher.Switcher
import kotlin.reflect.KClass

abstract class Actor<Command : Any, Event : Any> {

private val switchers = mutableMapOf<KClass<out Any>, Switcher>()
private val mutex = Mutex()
opengamer29 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Executes a command. This method is performed on the [Dispatchers.IO]
* [kotlinx.coroutines.Dispatchers.IO] which is set by ElmslieConfig.ioDispatchers()
Expand All @@ -17,4 +26,21 @@ abstract class Actor<Command : Any, Event : Any> {
errorMapper: (error: Throwable) -> Event? = { null },
) = mapNotNull { eventMapper(it) }
.catch { errorMapper(it)?.let { event -> emit(event) } ?: throw it }

protected fun <T : Any, Command : Any> Flow<T>.asSwitchFlow(command: Command, delayMillis: Long = 0): Flow<T> {
rcmkt marked this conversation as resolved.
Show resolved Hide resolved
return flow {
val switcher = mutex.withLock {
switchers.getOrPut(command::class) {
Switcher()
}
}
switcher.switch(delayMillis) { this@asSwitchFlow }.collect {
emit(it)
}
}
}

protected fun <T : Any> cancelSwitchFlow(command: KClass<out Any>): Flow<T> {
return switchers[command]?.cancel() ?: emptyFlow()
rcmkt marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
package money.vivid.elmslie.samples.coroutines.timer.elm

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import money.vivid.elmslie.core.store.Actor
import money.vivid.elmslie.core.switcher.Switcher

internal object TimerActor : Actor<Command, Event>() {

private val switcher = Switcher()

override fun execute(command: Command) =
when (command) {
is Command.Start ->
switcher
.switch { secondsFlow() }
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)
is Command.Stop -> switcher.cancel()
is Command.Start -> secondsFlow()
.asSwitchFlow(command)
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)

is Command.Stop -> cancelSwitchFlow(Command.Start::class)
}

@Suppress("MagicNumber")
private fun secondsFlow() = flow {
private fun secondsFlow(): Flow<Int> = flow {
repeat(10) {
delay(1000)
emit(it)
Expand Down
Loading