Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle incoming event sequentially by limiting dispatcher #260

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package money.vivid.elmslie.core.store

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
Expand All @@ -13,27 +15,26 @@ import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import money.vivid.elmslie.core.ElmScope
import money.vivid.elmslie.core.config.ElmslieConfig

@Suppress("TooGenericExceptionCaught")
@OptIn(ExperimentalCoroutinesApi::class)
class ElmStore<Event : Any, State : Any, Effect : Any, Command : Any>(
initialState: State,
private val reducer: StateReducer<Event, State, Effect, Command>,
private val actor: Actor<Command, out Event>,
storeListeners: Set<StoreListener<Event, State, Effect, Command>>? = null,
override val startEvent: Event? = null,
private val key: String =
(reducer::class.qualifiedName?: reducer::class.simpleName).orEmpty().replace(
(reducer::class.qualifiedName ?: reducer::class.simpleName).orEmpty().replace(
"Reducer",
"Store",
),
) : Store<Event, Effect, State> {

private val logger = ElmslieConfig.logger
private val eventMutex = Mutex()
private val eventDispatcher = ElmslieConfig.ioDispatchers.limitedParallelism(parallelism = 1)
opengamer29 marked this conversation as resolved.
Show resolved Hide resolved
opengamer29 marked this conversation as resolved.
Show resolved Hide resolved

private val effectsFlow = MutableSharedFlow<Effect>()

Expand All @@ -51,7 +52,9 @@ class ElmStore<Event : Any, State : Any, Effect : Any, Command : Any>(

override val effects: Flow<Effect> = effectsFlow.asSharedFlow()

override fun accept(event: Event) = dispatchEvent(event)
override fun accept(event: Event) {
scope.handleEvent(event)
}

override fun start(): Store<Event, Effect, State> {
startEvent?.let(::accept)
Expand All @@ -62,37 +65,30 @@ class ElmStore<Event : Any, State : Any, Effect : Any, Command : Any>(
scope.cancel()
}

private fun dispatchEvent(event: Event) {
scope.launch {
try {
storeListeners.forEach { it.onBeforeEvent(key, event, statesFlow.value) }
logger.debug(
message = "New event: $event",
tag = key,
)
val (_, effects, commands) =
eventMutex.withLock {
val oldState = statesFlow.value
val result = reducer.reduce(event, statesFlow.value)
val newState = result.state
statesFlow.value = newState
storeListeners.forEach {
it.onAfterEvent(key, newState, oldState, event)
}
result
}
effects.forEach { effect -> if (isActive) dispatchEffect(effect) }
commands.forEach { if (isActive) executeCommand(it) }
} catch (error: CancellationException) {
throw error
} catch (t: Throwable) {
storeListeners.forEach { it.onReducerError(key, t, event) }
logger.fatal(
message = "You must handle all errors inside reducer",
tag = key,
error = t,
)
private fun CoroutineScope.handleEvent(event: Event) = launch(eventDispatcher) {
opengamer29 marked this conversation as resolved.
Show resolved Hide resolved
try {
storeListeners.forEach { it.onBeforeEvent(key, event, statesFlow.value) }
logger.debug(
message = "New event: $event",
tag = key,
)
val oldState = statesFlow.value
val (state, effects, commands) = reducer.reduce(event, statesFlow.value)
statesFlow.value = state
storeListeners.forEach {
it.onAfterEvent(key, state, oldState, event)
}
effects.forEach { effect -> if (isActive) dispatchEffect(effect) }
commands.forEach { if (isActive) executeCommand(it) }
} catch (error: CancellationException) {
throw error
} catch (t: Throwable) {
storeListeners.forEach { it.onReducerError(key, t, event) }
logger.fatal(
message = "You must handle all errors inside reducer",
tag = key,
error = t,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package money.vivid.elmslie.core.store

import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
Expand All @@ -14,6 +10,7 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.runCurrent
Expand All @@ -24,6 +21,10 @@ import money.vivid.elmslie.core.testutil.model.Command
import money.vivid.elmslie.core.testutil.model.Effect
import money.vivid.elmslie.core.testutil.model.Event
import money.vivid.elmslie.core.testutil.model.State
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
class ElmStoreTest {
Expand Down Expand Up @@ -73,8 +74,7 @@ class ElmStoreTest {
val emittedStates = mutableListOf<State>()
val collectJob = launch { store.states.toList(emittedStates) }
store.accept(Event())
runCurrent()
delay(3500)
advanceTimeBy(3500)
store.stop()

assertEquals(
Expand Down
Loading