Skip to content

Commit

Permalink
Allow to specify default dispatcher for formula.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Mar 5, 2024
1 parent babd43c commit 990134b
Show file tree
Hide file tree
Showing 18 changed files with 133 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.instacart.formula.coroutines
import com.instacart.formula.FormulaRuntime
import com.instacart.formula.IFormula
import com.instacart.formula.RuntimeConfig
import com.instacart.formula.plugin.Inspector
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.instacart.formula.test

import com.instacart.formula.IFormula
import com.instacart.formula.RuntimeConfig
import com.instacart.formula.plugin.Dispatcher
import com.instacart.formula.plugin.Inspector
import com.instacart.formula.rxjava3.toObservable
import io.reactivex.rxjava3.subjects.BehaviorSubject
Expand All @@ -11,12 +12,14 @@ import io.reactivex.rxjava3.subjects.BehaviorSubject
*/
class RxJavaFormulaTestDelegate<Input : Any, Output : Any, FormulaT : IFormula<Input, Output>>(
override val formula: FormulaT,
private val isValidationEnabled: Boolean = true,
private val inspector: Inspector? = null,
isValidationEnabled: Boolean = true,
inspector: Inspector? = null,
dispatcher: Dispatcher? = null,
) : FormulaTestDelegate<Input, Output, FormulaT> {
private val runtimeConfig = RuntimeConfig(
isValidationEnabled = isValidationEnabled,
inspector = inspector,
defaultDispatcher = dispatcher,
)

private val inputRelay = BehaviorSubject.create<Input>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.instacart.formula.test

import com.instacart.formula.Action
import com.instacart.formula.IFormula
import com.instacart.formula.plugin.Dispatcher
import com.instacart.formula.plugin.Inspector

/**
Expand All @@ -12,8 +13,9 @@ import com.instacart.formula.plugin.Inspector
fun <Input : Any, Output : Any, F: IFormula<Input, Output>> F.test(
isValidationEnabled: Boolean = true,
inspector: Inspector? = null,
dispatcher: Dispatcher? = null,
): TestFormulaObserver<Input, Output, F> {
val delegate = RxJavaFormulaTestDelegate(this, isValidationEnabled, inspector)
val delegate = RxJavaFormulaTestDelegate(this, isValidationEnabled, inspector, dispatcher)
return TestFormulaObserver(delegate)
}

Expand All @@ -26,8 +28,9 @@ fun <Input : Any, Output : Any, F: IFormula<Input, Output>> F.test(
initialInput: Input,
isValidationEnabled: Boolean = true,
inspector: Inspector? = null,
dispatcher: Dispatcher? = null,
): TestFormulaObserver<Input, Output, F> {
return test(isValidationEnabled, inspector).apply {
return test(isValidationEnabled, inspector, dispatcher).apply {
input(initialInput)
}
}
Expand Down
4 changes: 2 additions & 2 deletions formula/src/main/java/com/instacart/formula/Action.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ package com.instacart.formula
*/
interface Action<Event> {
companion object {
private val INIT_ACTION = StartEventAction(Unit)

/**
* Emits an event when [Action] is initialized. You can use this action to send an event
Expand All @@ -44,8 +45,7 @@ interface Action<Event> {
* }
*/
fun onInit(): Action<Unit> {
@Suppress("UNCHECKED_CAST")
return StartEventAction(Unit)
return INIT_ACTION
}

/**
Expand Down
2 changes: 2 additions & 0 deletions formula/src/main/java/com/instacart/formula/Effect.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.instacart.formula

import com.instacart.formula.plugin.Plugin

/**
* Effect is a function returned within [Transition.Result] which will be executed
* by [FormulaRuntime]. The execution timing and thread will depend on the [Effect.Type]
Expand Down
8 changes: 6 additions & 2 deletions formula/src/main/java/com/instacart/formula/FormulaPlugins.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ object FormulaPlugins {
}

fun mainThreadDispatcher(): Dispatcher {
return plugin?.mainThreadDispatcher() ?: Dispatcher.NoOp
return plugin?.mainThreadDispatcher() ?: Dispatcher.None
}

fun backgroundThreadDispatcher(): Dispatcher {
return plugin?.backgroundThreadDispatcher() ?: Dispatcher.NoOp
return plugin?.backgroundThreadDispatcher() ?: Dispatcher.None
}

fun defaultDispatcher(): Dispatcher {
return plugin?.defaultDispatcher() ?: Dispatcher.None
}
}
15 changes: 8 additions & 7 deletions formula/src/main/java/com/instacart/formula/FormulaRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.instacart.formula.internal.FormulaManagerImpl
import com.instacart.formula.internal.ManagerDelegate
import com.instacart.formula.internal.SynchronizedUpdateQueue
import com.instacart.formula.plugin.Dispatcher
import com.instacart.formula.plugin.Inspector
import java.util.LinkedList
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -20,14 +19,15 @@ class FormulaRuntime<Input : Any, Output : Any>(
config: RuntimeConfig?,
) : ManagerDelegate {
private val isValidationEnabled = config?.isValidationEnabled ?: false
private val synchronizedUpdateQueue = SynchronizedUpdateQueue(
onEmpty = { emitOutputIfNeeded() }
)
private val inspector = FormulaPlugins.inspector(
type = formula.type(),
local = config?.inspector,
)
private val defaultDispatcher: Dispatcher = config?.defaultDispatcher ?: FormulaPlugins.defaultDispatcher()
private val implementation = formula.implementation()
private val synchronizedUpdateQueue = SynchronizedUpdateQueue(
onEmpty = { emitOutputIfNeeded() }
)

@Volatile
private var manager: FormulaManagerImpl<Input, *, Output>? = null
Expand Down Expand Up @@ -249,9 +249,9 @@ class FormulaRuntime<Input : Any, Output : Any>(
while (globalEffectQueue.isNotEmpty()) {
val effect = globalEffectQueue.pollFirst()
val dispatcher = when (effect.type) {
Effect.Unconfined -> Dispatcher.NoOp
Effect.Main -> FormulaPlugins.mainThreadDispatcher()
Effect.Background -> FormulaPlugins.backgroundThreadDispatcher()
Effect.Unconfined -> Dispatcher.None
Effect.Main -> Dispatcher.Main
Effect.Background -> Dispatcher.Background
}
dispatcher.dispatch(effect.executable)
}
Expand Down Expand Up @@ -310,6 +310,7 @@ class FormulaRuntime<Input : Any, Output : Any>(
initialInput = initialInput,
loggingType = formula::class,
inspector = inspector,
defaultDispatcher = defaultDispatcher,
)
}
}
10 changes: 10 additions & 0 deletions formula/src/main/java/com/instacart/formula/RuntimeConfig.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package com.instacart.formula

import com.instacart.formula.plugin.Inspector
import com.instacart.formula.plugin.Dispatcher

/**
* @param defaultDispatcher Dispatcher used for event processing (this can be overwritten by
* individual events). By default, formula runs on the thread on which the event arrived on.
* @param inspector Inspector that will be used when configuring the formula.
* @param isValidationEnabled A boolean that validates inputs and outputs by
* running [Formula.evaluate] twice. Should NOT be used in production builds,
* preferably only unit tests.
*/
class RuntimeConfig(
val defaultDispatcher: Dispatcher? = null,
val inspector: Inspector? = null,
val isValidationEnabled: Boolean = false,
)
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ internal class ChildrenManager(
formula: IFormula<ChildInput, ChildOutput>,
input: ChildInput,
): SingleRequestHolder<FormulaManager<ChildInput, ChildOutput>> {
@Suppress("UNCHECKED_CAST")
val children = children ?: run {
val initialized: SingleRequestMap<Any, FormulaManager<*, *>> = LinkedHashMap()
this.children = initialized
Expand All @@ -115,7 +114,8 @@ internal class ChildrenManager(
formula = implementation,
initialInput = input,
loggingType = formula::class,
inspector = inspector
inspector = inspector,
defaultDispatcher = delegate.defaultDispatcher,
)
}
@Suppress("UNCHECKED_CAST")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.instacart.formula.IFormula
import com.instacart.formula.plugin.Inspector
import com.instacart.formula.Snapshot
import com.instacart.formula.Transition
import com.instacart.formula.plugin.Dispatcher
import java.util.LinkedList
import kotlin.reflect.KClass

Expand All @@ -25,6 +26,7 @@ internal class FormulaManagerImpl<Input, State, Output>(
internal val loggingType: KClass<*>,
private val listeners: Listeners = Listeners(),
private val inspector: Inspector?,
val defaultDispatcher: Dispatcher,
) : FormulaManager<Input, Output>, ManagerDelegate {

private var state: State = formula.initialState(initialInput)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.instacart.formula.internal

import com.instacart.formula.Listener
import com.instacart.formula.Transition
import com.instacart.formula.plugin.Dispatcher

/**
* Note: this class is not a data class because equality is based on instance and not [key].
Expand All @@ -18,9 +19,12 @@ internal class ListenerImpl<Input, State, EventT>(internal var key: Any) : Liste
// TODO: log if null listener (it might be due to formula removal or due to callback removal)
val manager = manager ?: return

manager.queue.postUpdate {
val deferredTransition = DeferredTransition(this, transition, event)
manager.onPendingTransition(deferredTransition)
val dispatcher = manager.defaultDispatcher
dispatcher.dispatch {
manager.queue.postUpdate {
val deferredTransition = DeferredTransition(this, transition, event)
manager.onPendingTransition(deferredTransition)
}
}
}

Expand Down
27 changes: 26 additions & 1 deletion formula/src/main/java/com/instacart/formula/plugin/Dispatcher.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
package com.instacart.formula.plugin

import com.instacart.formula.FormulaPlugins

/**
* Dispatches executables to a specific thread.
*/
interface Dispatcher {
object NoOp : Dispatcher {
object None : Dispatcher {
override fun dispatch(executable: () -> Unit) {
executable()
}
}

/**
* Uses [Plugin.mainThreadDispatcher] to dispatch executables.
*/
object Main : Dispatcher {
override fun dispatch(executable: () -> Unit) {
val delegate = FormulaPlugins.mainThreadDispatcher()
delegate.dispatch(executable)
}
}

/**
* Uses [Plugin.backgroundThreadDispatcher] to dispatch executables.
*/
object Background : Dispatcher {
override fun dispatch(executable: () -> Unit) {
val delegate = FormulaPlugins.backgroundThreadDispatcher()
delegate.dispatch(executable)
}
}

/**
* Dispatches [executable] to a thread specified by the [Dispatcher].
*/
fun dispatch(executable: () -> Unit)
}
8 changes: 8 additions & 0 deletions formula/src/main/java/com/instacart/formula/plugin/Plugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ interface Plugin {
fun backgroundThreadDispatcher(): Dispatcher? {
return null
}

/**
* Default dispatcher that each formula runtime will use to process events. This
* can be overwritten by each formula individually.
*/
fun defaultDispatcher(): Dispatcher? {
return null
}
}
28 changes: 28 additions & 0 deletions formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.instacart.formula.subjects.EventFormula
import com.instacart.formula.subjects.ExtremelyNestedFormula
import com.instacart.formula.subjects.FromObservableWithInputFormula
import com.instacart.formula.subjects.HasChildFormula
import com.instacart.formula.subjects.IncrementingDispatcher
import com.instacart.formula.subjects.InputChangeWhileFormulaRunningRobot
import com.instacart.formula.subjects.KeyFormula
import com.instacart.formula.subjects.KeyUsingListFormula
Expand Down Expand Up @@ -1465,5 +1466,32 @@ class FormulaRuntimeTest(val runtime: TestableRuntime, val name: String) {
plugin.mainDispatcher.assertCalled(1)
plugin.backgroundDispatcher.assertCalled(1)
}

@Test fun `use global background dispatcher`() {
val globalDispatcher = IncrementingDispatcher()
val plugin = TestDispatcherPlugin(defaultDispatcher = globalDispatcher)
FormulaPlugins.setPlugin(plugin)

val formula = IncrementFormula()
val subject = runtime.test(formula, Unit)
globalDispatcher.assertCalled(0)
subject.output { onIncrement() }
globalDispatcher.assertCalled(1)
}

@Test fun `specify formula-level dispatcher`() {
val globalDispatcher = IncrementingDispatcher()
val plugin = TestDispatcherPlugin(defaultDispatcher = globalDispatcher)
FormulaPlugins.setPlugin(plugin)

val formulaDispatcher = IncrementingDispatcher()
val formula = IncrementFormula()
val subject = runtime.test(formula, Unit, dispatcher = formulaDispatcher)
globalDispatcher.assertCalled(0)
formulaDispatcher.assertCalled(0)
subject.output { onIncrement() }
globalDispatcher.assertCalled(0)
formulaDispatcher.assertCalled(1)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.instacart.formula.subjects
import com.instacart.formula.plugin.Plugin
import com.instacart.formula.plugin.Dispatcher

class TestDispatcherPlugin : Plugin {
class TestDispatcherPlugin(val defaultDispatcher: Dispatcher? = null) : Plugin {
val mainDispatcher = IncrementingDispatcher()
val backgroundDispatcher = IncrementingDispatcher()

Expand All @@ -14,4 +14,8 @@ class TestDispatcherPlugin : Plugin {
override fun backgroundThreadDispatcher(): Dispatcher {
return backgroundDispatcher
}

override fun defaultDispatcher(): Dispatcher? {
return defaultDispatcher
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.instacart.formula.plugin.Inspector
import com.instacart.formula.coroutines.FlowAction
import com.instacart.formula.coroutines.FlowFormula
import com.instacart.formula.coroutines.toFlow
import com.instacart.formula.plugin.Dispatcher
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -36,9 +37,10 @@ object CoroutinesTestableRuntime : TestableRuntime {
override fun <Input : Any, Output : Any, F : IFormula<Input, Output>> test(
formula: F,
inspector: Inspector?,
defaultDispatcher: Dispatcher?,
): TestFormulaObserver<Input, Output, F> {
val scope = coroutineTestRule.testCoroutineScope
val delegate = CoroutineTestDelegate(scope, formula, inspector)
val delegate = CoroutineTestDelegate(scope, formula, inspector, defaultDispatcher)
return TestFormulaObserver(delegate)
}

Expand Down Expand Up @@ -95,13 +97,15 @@ private class CoroutineTestDelegate<Input : Any, Output : Any, FormulaT : IFormu
private val scope: CoroutineScope,
override val formula: FormulaT,
private val inspector: Inspector?,
private val dispatcher: Dispatcher?,
): FormulaTestDelegate<Input, Output, FormulaT> {
private val values = mutableListOf<Output>()
private val errors = mutableListOf<Throwable>()

private val runtimeConfig = RuntimeConfig(
isValidationEnabled = true,
inspector = inspector,
defaultDispatcher = dispatcher,
)
private val inputFlow = MutableSharedFlow<Input>(1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
private val formulaFlow = formula.toFlow(inputFlow, runtimeConfig)
Expand Down
Loading

0 comments on commit 990134b

Please sign in to comment.