From b3abe8a41f1e09be70eec4723da3c73314164eb0 Mon Sep 17 00:00:00 2001 From: Laimonas Turauskas Date: Thu, 18 Jan 2024 21:35:55 -0500 Subject: [PATCH] Make formula internals thread safe. --- .../formula/coroutines/FlowRuntime.kt | 23 +++-- .../formula/rxjava3/RxJavaRuntime.kt | 23 +++-- .../formula/test/TestFormulaObserver.kt | 2 +- .../com/instacart/formula/FormulaRuntime.kt | 6 +- .../formula/internal/ChildrenManager.kt | 1 + .../formula/internal/FormulaManagerImpl.kt | 2 + .../formula/internal/ListenerImpl.kt | 6 +- .../internal/SynchronizedEventQueue.kt | 74 ++++++++++++++++ .../instacart/formula/FormulaRuntimeTest.kt | 27 +++++- .../formula/subjects/MultiThreadRobot.kt | 88 +++++++++++++++++++ .../formula/subjects/SleepFormula.kt | 55 ++++++++++++ 11 files changed, 286 insertions(+), 21 deletions(-) create mode 100644 formula/src/main/java/com/instacart/formula/internal/SynchronizedEventQueue.kt create mode 100644 formula/src/test/java/com/instacart/formula/subjects/MultiThreadRobot.kt create mode 100644 formula/src/test/java/com/instacart/formula/subjects/SleepFormula.kt diff --git a/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt b/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt index df3fdefe5..b51069e2c 100644 --- a/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt +++ b/formula-coroutines/src/main/java/com/instacart/formula/coroutines/FlowRuntime.kt @@ -4,6 +4,7 @@ import com.instacart.formula.FormulaPlugins import com.instacart.formula.FormulaRuntime import com.instacart.formula.IFormula import com.instacart.formula.Inspector +import com.instacart.formula.internal.SynchronizedEventQueue import com.instacart.formula.internal.ThreadChecker import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.awaitClose @@ -25,15 +26,17 @@ object FlowRuntime { ): Flow { val threadChecker = ThreadChecker(formula) return callbackFlow { - threadChecker.check("Need to subscribe on main thread.") +// threadChecker.check("Need to subscribe on main thread.") val mergedInspector = FormulaPlugins.inspector( type = formula.type(), local = inspector, ) + val queue = SynchronizedEventQueue() val runtimeFactory = { FormulaRuntime( + queue = queue, threadChecker = threadChecker, formula = formula, onOutput = this::trySendBlocking, @@ -46,17 +49,21 @@ object FlowRuntime { var runtime = runtimeFactory() input.onEach { input -> - threadChecker.check("Input arrived on a wrong thread.") - if (!runtime.isKeyValid(input)) { - runtime.terminate() - runtime = runtimeFactory() +// threadChecker.check("Input arrived on a wrong thread.") + queue.postUpdate { + if (!runtime.isKeyValid(input)) { + runtime.terminate() + runtime = runtimeFactory() + } + runtime.onInput(input) } - runtime.onInput(input) }.launchIn(this) awaitClose { - threadChecker.check("Need to unsubscribe on the main thread.") - runtime.terminate() +// threadChecker.check("Need to unsubscribe on the main thread.") + queue.postUpdate { + runtime.terminate() + } } }.distinctUntilChanged() } diff --git a/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt b/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt index 09101a172..8f490975d 100644 --- a/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt +++ b/formula-rxjava3/src/main/java/com/instacart/formula/rxjava3/RxJavaRuntime.kt @@ -4,6 +4,7 @@ import com.instacart.formula.FormulaPlugins import com.instacart.formula.FormulaRuntime import com.instacart.formula.IFormula import com.instacart.formula.Inspector +import com.instacart.formula.internal.SynchronizedEventQueue import com.instacart.formula.internal.ThreadChecker import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.disposables.CompositeDisposable @@ -22,8 +23,10 @@ object RxJavaRuntime { type = formula.type(), local = inspector, ) + val queue = SynchronizedEventQueue() val runtimeFactory = { FormulaRuntime( + queue = queue, threadChecker = threadChecker, formula = formula, onOutput = emitter::onNext, @@ -33,23 +36,27 @@ object RxJavaRuntime { ) } - threadChecker.check("Need to subscribe on main thread.") +// threadChecker.check("Need to subscribe on main thread.") var runtime = runtimeFactory() val disposables = CompositeDisposable() disposables.add(input.subscribe({ input -> - threadChecker.check("Input arrived on a wrong thread.") - if (!runtime.isKeyValid(input)) { - runtime.terminate() - runtime = runtimeFactory() +// threadChecker.check("Input arrived on a wrong thread.") + queue.postUpdate { + if (!runtime.isKeyValid(input)) { + runtime.terminate() + runtime = runtimeFactory() + } + runtime.onInput(input) } - runtime.onInput(input) }, emitter::onError)) val runnable = Runnable { - threadChecker.check("Need to unsubscribe on the main thread.") - runtime.terminate() +// threadChecker.check("Need to unsubscribe on the main thread.") + queue.postUpdate { + runtime.terminate() + } } disposables.add(FormulaDisposableHelper.fromRunnable(runnable)) diff --git a/formula-test/src/main/java/com/instacart/formula/test/TestFormulaObserver.kt b/formula-test/src/main/java/com/instacart/formula/test/TestFormulaObserver.kt index 8958d8373..2eaa1e663 100644 --- a/formula-test/src/main/java/com/instacart/formula/test/TestFormulaObserver.kt +++ b/formula-test/src/main/java/com/instacart/formula/test/TestFormulaObserver.kt @@ -6,7 +6,7 @@ class TestFormulaObserver, ) { - private var started: Boolean = false + @Volatile private var started: Boolean = false val formula: FormulaT = delegate.formula diff --git a/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt b/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt index 1856b0070..834974162 100644 --- a/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt +++ b/formula/src/main/java/com/instacart/formula/FormulaRuntime.kt @@ -3,13 +3,16 @@ package com.instacart.formula import com.instacart.formula.internal.FormulaManager import com.instacart.formula.internal.FormulaManagerImpl import com.instacart.formula.internal.ManagerDelegate +import com.instacart.formula.internal.SynchronizedEventQueue import com.instacart.formula.internal.ThreadChecker import java.util.LinkedList +import java.util.concurrent.atomic.AtomicReference /** * Takes a [Formula] and creates an Observable from it. */ class FormulaRuntime( + private val queue: SynchronizedEventQueue, private val threadChecker: ThreadChecker, private val formula: IFormula, private val onOutput: (Output) -> Unit, @@ -61,6 +64,7 @@ class FormulaRuntime( if (initialization) { manager = FormulaManagerImpl( + queue = queue, delegate = this, formula = implementation, initialInput = input, @@ -95,7 +99,7 @@ class FormulaRuntime( } override fun onPostTransition(effects: Effects?, evaluate: Boolean) { - threadChecker.check("Only thread that created it can post transition result") +// threadChecker.check("Only thread that created it can post transition result") effects?.let { globalEffectQueue.addLast(effects) diff --git a/formula/src/main/java/com/instacart/formula/internal/ChildrenManager.kt b/formula/src/main/java/com/instacart/formula/internal/ChildrenManager.kt index 3fcd9376b..1148893aa 100644 --- a/formula/src/main/java/com/instacart/formula/internal/ChildrenManager.kt +++ b/formula/src/main/java/com/instacart/formula/internal/ChildrenManager.kt @@ -110,6 +110,7 @@ internal class ChildrenManager( val childFormulaHolder = children.findOrInit(key) { val implementation = formula.implementation() FormulaManagerImpl( + queue = delegate.queue, delegate = delegate, formula = implementation, initialInput = input, diff --git a/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt b/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt index f1c2a998c..f22dfb9f2 100644 --- a/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt +++ b/formula/src/main/java/com/instacart/formula/internal/FormulaManagerImpl.kt @@ -18,6 +18,7 @@ import kotlin.reflect.KClass * a state change, it will rerun [Formula.evaluate]. */ internal class FormulaManagerImpl( + val queue: SynchronizedEventQueue, private val delegate: ManagerDelegate, private val formula: Formula, initialInput: Input, @@ -259,6 +260,7 @@ internal class FormulaManagerImpl( } fun onPendingTransition(transition: DeferredTransition<*, *, *>) { + // TODO: we should pass it to the runtime to ensure proper synchronization if (terminated) { transition.execute() } else if (isRunning) { diff --git a/formula/src/main/java/com/instacart/formula/internal/ListenerImpl.kt b/formula/src/main/java/com/instacart/formula/internal/ListenerImpl.kt index 3cf42ca68..886709b9e 100644 --- a/formula/src/main/java/com/instacart/formula/internal/ListenerImpl.kt +++ b/formula/src/main/java/com/instacart/formula/internal/ListenerImpl.kt @@ -18,8 +18,10 @@ internal class ListenerImpl(internal var key: Any) : Liste // TODO: log if null listener (it might be due to formula removal or due to callback removal) val manager = manager ?: return - val deferredTransition = DeferredTransition(this, transition, event) - manager.onPendingTransition(deferredTransition) + manager.queue.postUpdate { + val deferredTransition = DeferredTransition(this, transition, event) + manager.onPendingTransition(deferredTransition) + } } fun disable() { diff --git a/formula/src/main/java/com/instacart/formula/internal/SynchronizedEventQueue.kt b/formula/src/main/java/com/instacart/formula/internal/SynchronizedEventQueue.kt new file mode 100644 index 000000000..5de7b0cc6 --- /dev/null +++ b/formula/src/main/java/com/instacart/formula/internal/SynchronizedEventQueue.kt @@ -0,0 +1,74 @@ +package com.instacart.formula.internal + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +/** + * A non-blocking event queue that processes formula updates. + */ +class SynchronizedEventQueue { + private val threadRunning = AtomicReference() + private val concurrentLinkedQueue = ConcurrentLinkedQueue<() -> Unit>() + + /** + * All top-level formula interactions that trigger formula side-effects are posted here + * to make sure that they are executed one at a time. If there is a thread currently running + * formula, we hand the update to that thread for processing. The following + * root formula events are propagated via this queue: + * - Input change + * - Individual formula transitions + * - Termination + * + * Implementation works by having a concurrent queue and checking: + * - If queue is idle, execute current update and try to process other queue entries + * - If queue is running by the same thread, we execute current update and let other + * updates be handled by existing processing loop. + * - If queue is running by a different thread, add to the queue and see if we need to + * take over the processing. + */ + fun postUpdate(runnable: () -> Unit) { + val currentThread = Thread.currentThread() + val owner = threadRunning.get() + if (owner == currentThread) { + // Since we are on the same thread, just execute the event (no need to grab ownership) + runnable() + } else if (owner == null) { + if (threadRunning.compareAndSet(null, currentThread)) { + // The queue is idle, we first execute our own event and then move to the queue + runnable() + threadRunning.set(null) + + tryToProcessQueueIfNeeded(currentThread) + } else { + concurrentLinkedQueue.add(runnable) + tryToProcessQueueIfNeeded(currentThread) + } + } else { + concurrentLinkedQueue.add(runnable) + tryToProcessQueueIfNeeded(currentThread) + } + } + + private fun tryToProcessQueueIfNeeded(currentThread: Thread) { + while (true) { + // First, we peek to see if there is a value to process. + val peekUpdate = concurrentLinkedQueue.peek() + if (peekUpdate != null) { + // If there is a value to process, we check if we should process it. + if (threadRunning.compareAndSet(null, currentThread)) { + // We successfully set ourselves as the running thread + // We poll the queue to get the latest value (it could have changed). It + // also removes the value from the queue. + val actualUpdate = concurrentLinkedQueue.poll() + actualUpdate?.invoke() + threadRunning.set(null) + } else { + // Some other thread is running, let that thread execute the update. + return + } + } else { + return + } + } + } +} \ No newline at end of file diff --git a/formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt b/formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt index 16b8ea9bb..09be328d0 100644 --- a/formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt +++ b/formula/src/test/java/com/instacart/formula/FormulaRuntimeTest.kt @@ -35,6 +35,7 @@ import com.instacart.formula.subjects.KeyFormula import com.instacart.formula.subjects.KeyUsingListFormula import com.instacart.formula.subjects.MessageFormula import com.instacart.formula.subjects.MixingCallbackUseWithKeyUse +import com.instacart.formula.subjects.MultiThreadRobot import com.instacart.formula.subjects.MultipleChildEvents import com.instacart.formula.subjects.NestedCallbackCallRobot import com.instacart.formula.subjects.NestedChildTransitionAfterNoEvaluationPass @@ -62,6 +63,7 @@ import com.instacart.formula.subjects.TestKey import com.instacart.formula.subjects.TransitionAfterNoEvaluationPass import com.instacart.formula.subjects.UseInputFormula import com.instacart.formula.subjects.ReusableFunctionCreatesUniqueListeners +import com.instacart.formula.subjects.SleepFormula import com.instacart.formula.subjects.UniqueListenersWithinLoop import com.instacart.formula.subjects.UsingKeyToScopeCallbacksWithinAnotherFunction import com.instacart.formula.subjects.UsingKeyToScopeChildFormula @@ -83,6 +85,9 @@ import org.junit.rules.RuleChain import org.junit.rules.TestName import org.junit.runner.RunWith import org.junit.runners.Parameterized +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.ThreadFactory import java.util.concurrent.TimeUnit import kotlin.reflect.KClass @@ -646,7 +651,9 @@ class FormulaRuntimeTest(val runtime: TestableRuntime, val name: String) { } val observer = runtime.test(formula, Unit) - bgAction.latch.await(50, TimeUnit.MILLISECONDS) + if (!bgAction.latch.await(50, TimeUnit.MILLISECONDS)) { + throw IllegalStateException("Timeout") + } assertThat(bgAction.errors.values().firstOrNull()?.message).contains( "com.instacart.formula.subjects.OnlyUpdateFormula - Only thread that created it can post transition result Expected:" ) @@ -1294,6 +1301,24 @@ class FormulaRuntimeTest(val runtime: TestableRuntime, val name: String) { .assertValue(0) } + @Test + fun `formula multi thread handoff`() { + with(MultiThreadRobot(runtime)) { + threadA(50) + threadB(10) + awaitCompletion() + threadB(10) + + awaitEvents( + SleepFormula.SleepEvent(50, "thread-a"), + // First thread-b event is handed-off to thread-a + SleepFormula.SleepEvent(10, "thread-a"), + // Second thread-b event is handled by thread-b + SleepFormula.SleepEvent(10, "thread-b") + ) + } + } + @Test fun `inspector events`() { val globalInspector = TestInspector() diff --git a/formula/src/test/java/com/instacart/formula/subjects/MultiThreadRobot.kt b/formula/src/test/java/com/instacart/formula/subjects/MultiThreadRobot.kt new file mode 100644 index 000000000..f98f8f495 --- /dev/null +++ b/formula/src/test/java/com/instacart/formula/subjects/MultiThreadRobot.kt @@ -0,0 +1,88 @@ +package com.instacart.formula.subjects + +import com.google.common.truth.Truth +import com.instacart.formula.test.TestableRuntime +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import java.util.concurrent.ThreadFactory +import java.util.concurrent.TimeUnit + +class MultiThreadRobot(val runtime: TestableRuntime) { + class NamedThreadFactory(private val name: String): ThreadFactory { + override fun newThread(r: Runnable): Thread { + return Thread(r, name) + } + } + + private val executorA = Executors.newSingleThreadExecutor(NamedThreadFactory("thread-a")) + private val executorB = Executors.newSingleThreadExecutor(NamedThreadFactory("thread-b")) + + private val threadFormula = SleepFormula() + private val observer = runtime.test(threadFormula, Unit) + + @Volatile private var nextEventStartedLatch: CountDownLatch? = null + + private val eventCompletionLatches = ConcurrentLinkedQueue() + + private fun execute(executor: Executor, sleepDuration: Long) { + // Creating a latch and adding it to a list to make sure we are able to + // wait for all event completion. + val completionLatch = CountDownLatch(1) + eventCompletionLatches.add(completionLatch) + + // To ensure predictable order, we wait for previous executor to + // start, before we start ourselves. + val previousLatch = nextEventStartedLatch + if (previousLatch != null) { + await(previousLatch, 100, TimeUnit.MILLISECONDS) + } + + val localStartLatch = CountDownLatch(1) + nextEventStartedLatch = localStartLatch + + executor.execute { + if (previousLatch != null) { + // We give a little extra time for the other executor to pick up the event + Thread.sleep(10) + } + + localStartLatch.countDown() + + observer.output { + this.onSleep(sleepDuration) + + completionLatch.countDown() + } + } + } + + fun threadA(sleepDuration: Long) = apply { + execute(executorA, sleepDuration) + } + + fun threadB(sleepDuration: Long) = apply { + execute(executorB, sleepDuration) + } + + fun awaitCompletion() = apply { + for (latch in eventCompletionLatches) { + await(latch, 1, TimeUnit.SECONDS) + } + } + + fun awaitEvents(vararg sleepEvents: SleepFormula.SleepEvent) = apply { + awaitCompletion() + + observer.output { + Truth.assertThat(this.sleepEvents).containsExactly(*sleepEvents).inOrder() + } + } + + private fun await(latch: CountDownLatch, timeout: Long, unit: TimeUnit) { + if (!latch.await(timeout, unit)) { + throw IllegalStateException("Timeout") + } + } +} \ No newline at end of file diff --git a/formula/src/test/java/com/instacart/formula/subjects/SleepFormula.kt b/formula/src/test/java/com/instacart/formula/subjects/SleepFormula.kt new file mode 100644 index 000000000..08ddecefd --- /dev/null +++ b/formula/src/test/java/com/instacart/formula/subjects/SleepFormula.kt @@ -0,0 +1,55 @@ +package com.instacart.formula.subjects + +import com.instacart.formula.Action +import com.instacart.formula.Evaluation +import com.instacart.formula.Formula +import com.instacart.formula.Snapshot + +class SleepFormula : Formula() { + + data class SleepEvent( + val duration: Long, + val threadName: String, + ) + + data class State( + val sleepEvents: List = emptyList(), + val pendingEvent: SleepEvent? = null, + ) + + data class Output( + val sleepEvents: List, + val onSleep: (Long) -> Unit, + ) + + override fun initialState(input: Unit): State { + return State() + } + + override fun Snapshot.evaluate(): Evaluation { + return Evaluation( + output = Output( + sleepEvents = state.sleepEvents, + onSleep = context.onEvent { + val newEvent = SleepEvent( + duration = it, + threadName = Thread.currentThread().name, + ) + transition(state.copy(pendingEvent = newEvent)) + } + ), + actions = context.actions { + state.pendingEvent?.let { + Action.onData(it).onEvent { event -> + // Using sleep to control multi-threaded events + Thread.sleep(event.duration) + val newState = state.copy( + sleepEvents = state.sleepEvents + event, + ) + transition(newState) + } + } + } + ) + } +} \ No newline at end of file