Skip to content

Commit

Permalink
Allow to specify default dispatcher for formula.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Mar 4, 2024
1 parent babd43c commit 2769c43
Show file tree
Hide file tree
Showing 16 changed files with 95 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.instacart.formula.coroutines
import com.instacart.formula.FormulaRuntime
import com.instacart.formula.IFormula
import com.instacart.formula.RuntimeConfig
import com.instacart.formula.plugin.Inspector
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
Expand Down
4 changes: 2 additions & 2 deletions formula/src/main/java/com/instacart/formula/Action.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ package com.instacart.formula
*/
interface Action<Event> {
companion object {
private val INIT_ACTION = StartEventAction(Unit)

/**
* Emits an event when [Action] is initialized. You can use this action to send an event
Expand All @@ -44,8 +45,7 @@ interface Action<Event> {
* }
*/
fun onInit(): Action<Unit> {
@Suppress("UNCHECKED_CAST")
return StartEventAction(Unit)
return INIT_ACTION
}

/**
Expand Down
17 changes: 4 additions & 13 deletions formula/src/main/java/com/instacart/formula/ActionBuilder.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.instacart.formula

import com.instacart.formula.plugin.Dispatcher

/**
* Action builder is used to create a list of deferred [actions][Action] that
* will be executed by Formula runtime after Formula evaluation finished. To
Expand Down Expand Up @@ -36,19 +38,7 @@ abstract class ActionBuilder<out Input, State>(
*/
abstract fun <Event> events(
action: Action<Event>,
transition: Transition<Input, State, Event>,
)

/**
* Adds an [Action] as part of this [Evaluation]. [Action] will be initialized
* when it is initially added and cleaned up when it is not returned
* as part of [Evaluation].
*
* @param transition A function that is invoked when [Action] emits an [Event].
*/
abstract fun <Event> onEvent(
action: Action<Event>,
avoidParameterClash: Any = this,
dispatcher: Dispatcher? = null,
transition: Transition<Input, State, Event>,
)

Expand All @@ -67,6 +57,7 @@ abstract class ActionBuilder<out Input, State>(
* ```
*/
abstract fun <Event> Action<Event>.onEvent(
dispatcher: Dispatcher? = null,
transition: Transition<Input, State, Event>,
)
}
17 changes: 16 additions & 1 deletion formula/src/main/java/com/instacart/formula/FormulaContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.instacart.formula

import com.instacart.formula.internal.Listeners
import com.instacart.formula.internal.UnitListener
import com.instacart.formula.plugin.Dispatcher
import kotlin.reflect.KClass

/**
Expand All @@ -22,12 +23,24 @@ abstract class FormulaContext<out Input, State> internal constructor(
*/
fun callback(
key: Any? = null,
dispatcher: Dispatcher? = null,
transition: Transition<Input, State, Unit>,
): () -> Unit {
val listener = onEvent(key, transition)
val listener = onEvent(key, dispatcher, transition)
return UnitListener(listener)
}


/**
* Creates a listener that takes an event and performs a [Transition].
*/
fun callback(
dispatcher: Dispatcher,
transition: Transition<Input, State, Unit>,
): () -> Unit {
val listener = onEvent(null, dispatcher, transition)
return UnitListener(listener)
}
/**
* Creates a [Listener] that takes a [Event] and performs a [Transition]. It uses a composite
* key of [transition] type and optional [key] property.
Expand All @@ -37,6 +50,7 @@ abstract class FormulaContext<out Input, State> internal constructor(
*/
fun <Event> onEvent(
key: Any? = null,
dispatcher: Dispatcher? = null,
transition: Transition<Input, State, Event>,
): Listener<Event> {
return eventListener(
Expand Down Expand Up @@ -85,6 +99,7 @@ abstract class FormulaContext<out Input, State> internal constructor(
// Internal listener management
internal abstract fun <Event> eventListener(
key: Any,
dispatcher: Dispatcher? = null,
useIndex: Boolean = true,
transition: Transition<Input, State, Event>
): Listener<Event>
Expand Down
4 changes: 2 additions & 2 deletions formula/src/main/java/com/instacart/formula/FormulaPlugins.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ object FormulaPlugins {
}

fun mainThreadDispatcher(): Dispatcher {
return plugin?.mainThreadDispatcher() ?: Dispatcher.NoOp
return plugin?.mainThreadDispatcher() ?: Dispatcher.None
}

fun backgroundThreadDispatcher(): Dispatcher {
return plugin?.backgroundThreadDispatcher() ?: Dispatcher.NoOp
return plugin?.backgroundThreadDispatcher() ?: Dispatcher.None
}
}
15 changes: 8 additions & 7 deletions formula/src/main/java/com/instacart/formula/FormulaRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.instacart.formula.internal.FormulaManagerImpl
import com.instacart.formula.internal.ManagerDelegate
import com.instacart.formula.internal.SynchronizedUpdateQueue
import com.instacart.formula.plugin.Dispatcher
import com.instacart.formula.plugin.Inspector
import java.util.LinkedList
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -20,14 +19,15 @@ class FormulaRuntime<Input : Any, Output : Any>(
config: RuntimeConfig?,
) : ManagerDelegate {
private val isValidationEnabled = config?.isValidationEnabled ?: false
private val synchronizedUpdateQueue = SynchronizedUpdateQueue(
onEmpty = { emitOutputIfNeeded() }
)
private val inspector = FormulaPlugins.inspector(
type = formula.type(),
local = config?.inspector,
)
private val eventDispatcher: Dispatcher = config?.defaultDispatcher ?: Dispatcher.None
private val implementation = formula.implementation()
private val synchronizedUpdateQueue = SynchronizedUpdateQueue(
onEmpty = { emitOutputIfNeeded() }
)

@Volatile
private var manager: FormulaManagerImpl<Input, *, Output>? = null
Expand Down Expand Up @@ -249,9 +249,9 @@ class FormulaRuntime<Input : Any, Output : Any>(
while (globalEffectQueue.isNotEmpty()) {
val effect = globalEffectQueue.pollFirst()
val dispatcher = when (effect.type) {
Effect.Unconfined -> Dispatcher.NoOp
Effect.Main -> FormulaPlugins.mainThreadDispatcher()
Effect.Background -> FormulaPlugins.backgroundThreadDispatcher()
Effect.Unconfined -> Dispatcher.None
Effect.Main -> Dispatcher.Main
Effect.Background -> Dispatcher.Background
}
dispatcher.dispatch(effect.executable)
}
Expand Down Expand Up @@ -310,6 +310,7 @@ class FormulaRuntime<Input : Any, Output : Any>(
initialInput = initialInput,
loggingType = formula::class,
inspector = inspector,
eventDispatcher = eventDispatcher,
)
}
}
10 changes: 10 additions & 0 deletions formula/src/main/java/com/instacart/formula/RuntimeConfig.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package com.instacart.formula

import com.instacart.formula.plugin.Inspector
import com.instacart.formula.plugin.Dispatcher

/**
* @param defaultDispatcher Dispatcher used for event processing (this can be overwritten by
* individual events). By default, formula runs on the thread on which the event arrived on.
* @param inspector Inspector that will be used when configuring the formula.
* @param isValidationEnabled A boolean that validates inputs and outputs by
* running [Formula.evaluate] twice. Should NOT be used in production builds,
* preferably only unit tests.
*/
class RuntimeConfig(
val defaultDispatcher: Dispatcher? = null,
val inspector: Inspector? = null,
val isValidationEnabled: Boolean = false,
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.instacart.formula.ActionBuilder
import com.instacart.formula.DeferredAction
import com.instacart.formula.Snapshot
import com.instacart.formula.Transition
import com.instacart.formula.plugin.Dispatcher

/**
* Implements [ActionBuilder] interface.
Expand All @@ -19,24 +20,18 @@ internal class ActionBuilderImpl<out Input, State> internal constructor(

override fun <Event> events(
action: Action<Event>,
dispatcher: Dispatcher?,
transition: Transition<Input, State, Event>,
) {
add(toBoundStream(action, transition))
}

override fun <Event> onEvent(
action: Action<Event>,
avoidParameterClash: Any,
transition: Transition<Input, State, Event>,
) {
add(toBoundStream(action, transition))
add(toBoundStream(action, dispatcher, transition))
}

override fun <Event> Action<Event>.onEvent(
dispatcher: Dispatcher?,
transition: Transition<Input, State, Event>,
) {
val stream = this
this@ActionBuilderImpl.events(stream, transition)
this@ActionBuilderImpl.events(stream, dispatcher, transition)
}

private fun add(action: DeferredAction<*>) {
Expand All @@ -49,10 +44,11 @@ internal class ActionBuilderImpl<out Input, State> internal constructor(

private fun <Event> toBoundStream(
stream: Action<Event>,
dispatcher: Dispatcher? = null,
transition: Transition<Input, State, Event>,
): DeferredAction<Event> {
val key = snapshot.context.createScopedKey(transition.type(), stream.key())
val listener = snapshot.context.eventListener(key, useIndex = false, transition)
val listener = snapshot.context.eventListener(key, dispatcher = dispatcher, useIndex = false, transition)
return DeferredAction(
key = key,
action = stream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ internal class ChildrenManager(
formula = implementation,
initialInput = input,
loggingType = formula::class,
inspector = inspector
inspector = inspector,
eventDispatcher = delegate.eventDispatcher,
)
}
@Suppress("UNCHECKED_CAST")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.instacart.formula.IFormula
import com.instacart.formula.plugin.Inspector
import com.instacart.formula.Snapshot
import com.instacart.formula.Transition
import com.instacart.formula.plugin.Dispatcher
import java.util.LinkedList
import kotlin.reflect.KClass

Expand All @@ -25,6 +26,7 @@ internal class FormulaManagerImpl<Input, State, Output>(
internal val loggingType: KClass<*>,
private val listeners: Listeners = Listeners(),
private val inspector: Inspector?,
val eventDispatcher: Dispatcher,
) : FormulaManager<Input, Output>, ManagerDelegate {

private var state: State = formula.initialState(initialInput)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.instacart.formula.internal

import com.instacart.formula.Listener
import com.instacart.formula.Transition
import com.instacart.formula.plugin.Dispatcher

/**
* Note: this class is not a data class because equality is based on instance and not [key].
Expand All @@ -11,16 +12,20 @@ internal class ListenerImpl<Input, State, EventT>(internal var key: Any) : Liste

@Volatile internal var manager: FormulaManagerImpl<Input, State, *>? = null
@Volatile internal var snapshotImpl: SnapshotImpl<Input, State>? = null
@Volatile internal var dispatcher: Dispatcher? = null

internal lateinit var transition: Transition<Input, State, EventT>

override fun invoke(event: EventT) {
// TODO: log if null listener (it might be due to formula removal or due to callback removal)
val manager = manager ?: return

manager.queue.postUpdate {
val deferredTransition = DeferredTransition(this, transition, event)
manager.onPendingTransition(deferredTransition)
val dispatcher = dispatcher ?: manager.eventDispatcher
dispatcher.dispatch {
manager.queue.postUpdate {
val deferredTransition = DeferredTransition(this, transition, event)
manager.onPendingTransition(deferredTransition)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.instacart.formula.Listener
import com.instacart.formula.Snapshot
import com.instacart.formula.Transition
import com.instacart.formula.TransitionContext
import com.instacart.formula.plugin.Dispatcher
import java.lang.IllegalStateException
import kotlin.reflect.KClass

Expand Down Expand Up @@ -46,13 +47,15 @@ internal class SnapshotImpl<out Input, State> internal constructor(

override fun <Event> eventListener(
key: Any,
dispatcher: Dispatcher?,
useIndex: Boolean,
transition: Transition<Input, State, Event>
): Listener<Event> {
ensureNotRunning()
val listener = listeners.initOrFindListener<Input, State, Event>(key, useIndex)
listener.manager = delegate
listener.snapshotImpl = this
listener.dispatcher = dispatcher
listener.transition = transition
return listener
}
Expand Down
27 changes: 26 additions & 1 deletion formula/src/main/java/com/instacart/formula/plugin/Dispatcher.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
package com.instacart.formula.plugin

import com.instacart.formula.FormulaPlugins

/**
* Dispatches executables to a specific thread.
*/
interface Dispatcher {
object NoOp : Dispatcher {
object None : Dispatcher {
override fun dispatch(executable: () -> Unit) {
executable()
}
}

/**
* Uses [Plugin.mainThreadDispatcher] to dispatch executables.
*/
object Main : Dispatcher {
override fun dispatch(executable: () -> Unit) {
val delegate = FormulaPlugins.mainThreadDispatcher()
delegate.dispatch(executable)
}
}

/**
* Uses [Plugin.backgroundThreadDispatcher] to dispatch executables.
*/
object Background : Dispatcher {
override fun dispatch(executable: () -> Unit) {
val delegate = FormulaPlugins.backgroundThreadDispatcher()
delegate.dispatch(executable)
}
}

/**
* Dispatches [executable] to a thread specified by the [Dispatcher].
*/
fun dispatch(executable: () -> Unit)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.instacart.formula.subjects
import com.instacart.formula.Evaluation
import com.instacart.formula.Formula
import com.instacart.formula.Snapshot
import com.instacart.formula.plugin.Dispatcher
import com.instacart.formula.rxjava3.RxAction
import com.instacart.formula.test.TestableRuntime
import io.reactivex.rxjava3.core.Observable
Expand Down Expand Up @@ -74,7 +75,7 @@ class MultiChildIndirectStateChangeRobot(runtime: TestableRuntime) {
override fun initialState(input: Unit): State = State()

override fun Snapshot<Unit, State>.evaluate(): Evaluation<Output> {
val next = context.callback {
val next = context.callback(Dispatcher.None) {
val newState = state.copy(actionId = state.actionId + 1)
transition(newState)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class StartStopFormula(runtime: TestableRuntime) : Formula<Unit, State, Output>(
output = Output(
state = state.count,
// We need to specify keys since `UpdateListenFlag` type is used two times.
startListening = context.onEvent("start", UpdateListenFlag(listen = true)),
stopListening = context.onEvent("stop", UpdateListenFlag(listen = false)),
startListening = context.onEvent("start", transition = UpdateListenFlag(listen = true)),
stopListening = context.onEvent("stop", transition = UpdateListenFlag(listen = false)),
)
)
}
Expand Down
Loading

0 comments on commit 2769c43

Please sign in to comment.