Skip to content

Commit

Permalink
Merge pull request #260 from vivid-money/feature/restrict-event-handl…
Browse files Browse the repository at this point in the history
…e-palallelism

handle incoming event sequentially by limiting dispatcher
  • Loading branch information
rcmkt authored Apr 22, 2024
2 parents db0d7d1 + ca686fb commit bd7303f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package money.vivid.elmslie.core.store

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
Expand All @@ -13,27 +15,26 @@ import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import money.vivid.elmslie.core.ElmScope
import money.vivid.elmslie.core.config.ElmslieConfig

@Suppress("TooGenericExceptionCaught")
@OptIn(ExperimentalCoroutinesApi::class)
class ElmStore<Event : Any, State : Any, Effect : Any, Command : Any>(
initialState: State,
private val reducer: StateReducer<Event, State, Effect, Command>,
private val actor: Actor<Command, out Event>,
storeListeners: Set<StoreListener<Event, State, Effect, Command>>? = null,
override val startEvent: Event? = null,
private val key: String =
(reducer::class.qualifiedName?: reducer::class.simpleName).orEmpty().replace(
(reducer::class.qualifiedName ?: reducer::class.simpleName).orEmpty().replace(
"Reducer",
"Store",
),
) : Store<Event, Effect, State> {

private val logger = ElmslieConfig.logger
private val eventMutex = Mutex()
private val eventDispatcher = ElmslieConfig.ioDispatchers.limitedParallelism(parallelism = 1)

private val effectsFlow = MutableSharedFlow<Effect>()

Expand All @@ -51,7 +52,9 @@ class ElmStore<Event : Any, State : Any, Effect : Any, Command : Any>(

override val effects: Flow<Effect> = effectsFlow.asSharedFlow()

override fun accept(event: Event) = dispatchEvent(event)
override fun accept(event: Event) {
scope.handleEvent(event)
}

override fun start(): Store<Event, Effect, State> {
startEvent?.let(::accept)
Expand All @@ -62,37 +65,30 @@ class ElmStore<Event : Any, State : Any, Effect : Any, Command : Any>(
scope.cancel()
}

private fun dispatchEvent(event: Event) {
scope.launch {
try {
storeListeners.forEach { it.onBeforeEvent(key, event, statesFlow.value) }
logger.debug(
message = "New event: $event",
tag = key,
)
val (_, effects, commands) =
eventMutex.withLock {
val oldState = statesFlow.value
val result = reducer.reduce(event, statesFlow.value)
val newState = result.state
statesFlow.value = newState
storeListeners.forEach {
it.onAfterEvent(key, newState, oldState, event)
}
result
}
effects.forEach { effect -> if (isActive) dispatchEffect(effect) }
commands.forEach { if (isActive) executeCommand(it) }
} catch (error: CancellationException) {
throw error
} catch (t: Throwable) {
storeListeners.forEach { it.onReducerError(key, t, event) }
logger.fatal(
message = "You must handle all errors inside reducer",
tag = key,
error = t,
)
private fun CoroutineScope.handleEvent(event: Event) = launch(eventDispatcher) {
try {
storeListeners.forEach { it.onBeforeEvent(key, event, statesFlow.value) }
logger.debug(
message = "New event: $event",
tag = key,
)
val oldState = statesFlow.value
val (state, effects, commands) = reducer.reduce(event, statesFlow.value)
statesFlow.value = state
storeListeners.forEach {
it.onAfterEvent(key, state, oldState, event)
}
effects.forEach { effect -> if (isActive) dispatchEffect(effect) }
commands.forEach { if (isActive) executeCommand(it) }
} catch (error: CancellationException) {
throw error
} catch (t: Throwable) {
storeListeners.forEach { it.onReducerError(key, t, event) }
logger.fatal(
message = "You must handle all errors inside reducer",
tag = key,
error = t,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package money.vivid.elmslie.core.store

import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
Expand All @@ -14,6 +10,7 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.runCurrent
Expand All @@ -24,6 +21,10 @@ import money.vivid.elmslie.core.testutil.model.Command
import money.vivid.elmslie.core.testutil.model.Effect
import money.vivid.elmslie.core.testutil.model.Event
import money.vivid.elmslie.core.testutil.model.State
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
class ElmStoreTest {
Expand Down Expand Up @@ -73,8 +74,7 @@ class ElmStoreTest {
val emittedStates = mutableListOf<State>()
val collectJob = launch { store.states.toList(emittedStates) }
store.accept(Event())
runCurrent()
delay(3500)
advanceTimeBy(3500)
store.stop()

assertEquals(
Expand Down

0 comments on commit bd7303f

Please sign in to comment.