Skip to content

Commit

Permalink
Do not block on emission.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Feb 26, 2024
1 parent 99a85da commit d9f6708
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 12 deletions.
44 changes: 33 additions & 11 deletions formula/src/main/java/com/instacart/formula/FormulaRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.instacart.formula.internal.ManagerDelegate
import com.instacart.formula.internal.SynchronizedUpdateQueue
import com.instacart.formula.plugin.Dispatcher
import java.util.LinkedList
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

/**
* Takes a [Formula] and creates an Observable<Output> from it.
Expand All @@ -17,14 +19,15 @@ class FormulaRuntime<Input : Any, Output : Any>(
private val isValidationEnabled: Boolean = false,
inspector: Inspector? = null,
) : ManagerDelegate {
private val synchronizedUpdateQueue = SynchronizedUpdateQueue()
private val synchronizedUpdateQueue = SynchronizedUpdateQueue(
onEmpty = { emitOutputIfNeeded() }
)
private val inspector = FormulaPlugins.inspector(type = formula.type(), local = inspector)
private val implementation = formula.implementation()

@Volatile
private var manager: FormulaManagerImpl<Input, *, Output>? = null

private var emitOutput = false
private var lastOutput: Output? = null
private var input: Input? = null
private var key: Any? = null

Expand Down Expand Up @@ -56,8 +59,15 @@ class FormulaRuntime<Input : Any, Output : Any>(
* this [FormulaRuntime] instance. We will not accept any more [onInput] changes and will
* not emit any new [Output] events.
*/
@Volatile
private var isRuntimeTerminated: Boolean = false

/**
* Pending output to be emitted to the formula subscriber.
*/
private var pendingOutput = AtomicReference<Output>()
private var isEmitting = AtomicBoolean(false)

private fun isKeyValid(input: Input): Boolean {
return this.input == null || key == formula.key(input)
}
Expand Down Expand Up @@ -193,9 +203,6 @@ class FormulaRuntime<Input : Any, Output : Any>(
if (isExecutingEffects) return
executeTransitionEffects()

if (!manager.isTerminated()) {
emitOutputIfNeeded()
}
} catch (e: Throwable) {
isRunning = false

Expand All @@ -210,8 +217,7 @@ class FormulaRuntime<Input : Any, Output : Any>(
*/
private fun runFormula(manager: FormulaManager<Input, Output>, currentInput: Input) {
val result = manager.run(currentInput)
lastOutput = result.output
emitOutput = true
pendingOutput.set(result.output)

if (isValidationEnabled) {
try {
Expand Down Expand Up @@ -252,9 +258,25 @@ class FormulaRuntime<Input : Any, Output : Any>(
* Emits output to the formula subscriber.
*/
private fun emitOutputIfNeeded() {
if (emitOutput && !isRuntimeTerminated) {
emitOutput = false
onOutput(checkNotNull(lastOutput))
var output = pendingOutput.get()
while (output != null) {
if (isEmitting.compareAndSet(false, true)) {
if (!isRuntimeTerminated && manager?.isTerminated() != true) {
onOutput(output)
}

// If it is still our output, we try to clear it
pendingOutput.compareAndSet(output, null)

// Allow others to take on the processing
isEmitting.set(false)

// Check if there is another update and try to process if no-one else has taken over
output = pendingOutput.get()
} else {
// Someone else is emitting the value
output = null
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ internal class FormulaManagerImpl<Input, State, Output>(
* start new actions. And then, [performTerminationSideEffects] is called to clean
* up this [formula] and its child formulas.
*/
@Volatile
private var terminated = false

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import java.util.concurrent.atomic.AtomicReference
* that there is happens-before relationship between each thread and memory changes are visible
* between them.
*/
class SynchronizedUpdateQueue {
class SynchronizedUpdateQueue(
private val onEmpty: (() -> Unit)? = null,
) {
/**
* Defines a thread currently executing formula update. Null value indicates idle queue.
*
Expand Down Expand Up @@ -79,6 +81,7 @@ class SynchronizedUpdateQueue {
return
}
} else {
onEmpty?.invoke()
return
}
}
Expand Down

0 comments on commit d9f6708

Please sign in to comment.