Skip to content

Commit

Permalink
cancel flow method; rename extension
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Berdnikov committed Feb 12, 2024
1 parent afddb28 commit f1cdc15
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package money.vivid.elmslie.core.store

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.sync.Mutex
Expand All @@ -11,7 +12,7 @@ import kotlin.reflect.KClass

abstract class Actor<Command : Any, Event : Any> {

protected val switchers = mutableMapOf<KClass<out Any>, Switcher>()
private val switchers = mutableMapOf<KClass<out Any>, Switcher>()
private val mutex = Mutex()

/**
Expand All @@ -26,16 +27,20 @@ abstract class Actor<Command : Any, Event : Any> {
) = mapNotNull { eventMapper(it) }
.catch { errorMapper(it)?.let { event -> emit(event) } ?: throw it }

protected fun <T : Any, Command : Any> Flow<T>.switchOnEach(command: Command, delayMillis: Long = 0): Flow<T> {
protected fun <T : Any, Command : Any> Flow<T>.asSwitchFlow(command: Command, delayMillis: Long = 0): Flow<T> {
return flow {
val switcher = mutex.withLock {
switchers.getOrPut(command::class) {
Switcher()
}
}
switcher.switch(delayMillis) { this@switchOnEach }.collect {
switcher.switch(delayMillis) { this@asSwitchFlow }.collect {
emit(it)
}
}
}

protected fun <T : Any> cancelSwitchFlow(command: KClass<out Any>): Flow<T> {
return switchers[command]?.cancel() ?: emptyFlow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,19 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import money.vivid.elmslie.core.store.Actor
import money.vivid.elmslie.core.switcher.Switcher

internal object TimerActor : Actor<Command, Event>() {

private val switcher = Switcher()

override fun execute(command: Command) =
when (command) {
is Command.Start -> secondsFlow()
.switchOnEach(command)
.asSwitchFlow(command)
.mapEvents(
eventMapper = { Event.OnTimeTick },
errorMapper = { Event.OnTimeError(it) },
)

is Command.Stop -> switchers.getOrPut(Command.Start::class) {
Switcher()
}.cancel()
is Command.Stop -> cancelSwitchFlow(Command.Start::class)
}

@Suppress("MagicNumber")
Expand Down

0 comments on commit f1cdc15

Please sign in to comment.