diff --git a/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt b/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt index 4d60bb8e..6c995cdf 100644 --- a/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt +++ b/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt @@ -6,6 +6,8 @@ import com.instacart.formula.internal.ManagerDelegate import com.instacart.formula.internal.SynchronizedUpdateQueue import com.instacart.formula.plugin.Dispatcher import java.util.LinkedList +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference /** * Takes a [Formula] and creates an Observable from it. @@ -17,14 +19,15 @@ class FormulaRuntime( private val isValidationEnabled: Boolean = false, inspector: Inspector? = null, ) : ManagerDelegate { - private val synchronizedUpdateQueue = SynchronizedUpdateQueue() + private val synchronizedUpdateQueue = SynchronizedUpdateQueue( + onEmpty = { emitOutputIfNeeded() } + ) private val inspector = FormulaPlugins.inspector(type = formula.type(), local = inspector) private val implementation = formula.implementation() + @Volatile private var manager: FormulaManagerImpl? = null - private var emitOutput = false - private var lastOutput: Output? = null private var input: Input? = null private var key: Any? = null @@ -56,8 +59,15 @@ class FormulaRuntime( * this [FormulaRuntime] instance. We will not accept any more [onInput] changes and will * not emit any new [Output] events. */ + @Volatile private var isRuntimeTerminated: Boolean = false + /** + * Pending output to be emitted to the formula subscriber. + */ + private var pendingOutput = AtomicReference() + private var isEmitting = AtomicBoolean(false) + private fun isKeyValid(input: Input): Boolean { return this.input == null || key == formula.key(input) } @@ -193,9 +203,6 @@ class FormulaRuntime( if (isExecutingEffects) return executeTransitionEffects() - if (!manager.isTerminated()) { - emitOutputIfNeeded() - } } catch (e: Throwable) { isRunning = false @@ -210,8 +217,7 @@ class FormulaRuntime( */ private fun runFormula(manager: FormulaManager, currentInput: Input) { val result = manager.run(currentInput) - lastOutput = result.output - emitOutput = true + pendingOutput.set(result.output) if (isValidationEnabled) { try { @@ -252,9 +258,25 @@ class FormulaRuntime( * Emits output to the formula subscriber. */ private fun emitOutputIfNeeded() { - if (emitOutput && !isRuntimeTerminated) { - emitOutput = false - onOutput(checkNotNull(lastOutput)) + var output = pendingOutput.get() + while (output != null) { + if (isEmitting.compareAndSet(false, true)) { + if (!isRuntimeTerminated && manager?.isTerminated() != true) { + onOutput(output) + } + + // If it is still our output, we try to clear it + pendingOutput.compareAndSet(output, null) + + // Allow others to take on the processing + isEmitting.set(false) + + // Check if there is another update and try to process if no-one else has taken over + output = pendingOutput.get() + } else { + // Someone else is emitting the value + output = null + } } } diff --git a/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt b/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt index 9a6f7f86..83681e73 100644 --- a/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt +++ b/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt @@ -44,6 +44,7 @@ internal class FormulaManagerImpl( * start new actions. And then, [performTerminationSideEffects] is called to clean * up this [formula] and its child formulas. */ + @Volatile private var terminated = false /** diff --git a/formula/src/main/java/com/instacart/formula/internal/SynchronizedUpdateQueue.kt b/formula/src/main/java/com/instacart/formula/internal/SynchronizedUpdateQueue.kt index b3975ce8..3f812ec1 100644 --- a/formula/src/main/java/com/instacart/formula/internal/SynchronizedUpdateQueue.kt +++ b/formula/src/main/java/com/instacart/formula/internal/SynchronizedUpdateQueue.kt @@ -13,7 +13,9 @@ import java.util.concurrent.atomic.AtomicReference * that there is happens-before relationship between each thread and memory changes are visible * between them. */ -class SynchronizedUpdateQueue { +class SynchronizedUpdateQueue( + private val onEmpty: (() -> Unit)? = null, +) { /** * Defines a thread currently executing formula update. Null value indicates idle queue. * @@ -79,6 +81,7 @@ class SynchronizedUpdateQueue { return } } else { + onEmpty?.invoke() return } }