-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
286 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
74 changes: 74 additions & 0 deletions
74
formula/src/main/java/com/instacart/formula/internal/SynchronizedEventQueue.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package com.instacart.formula.internal | ||
|
||
import java.util.concurrent.ConcurrentLinkedQueue | ||
import java.util.concurrent.atomic.AtomicReference | ||
|
||
/** | ||
* A non-blocking event queue that processes formula updates. | ||
*/ | ||
class SynchronizedEventQueue { | ||
private val threadRunning = AtomicReference<Thread>() | ||
private val concurrentLinkedQueue = ConcurrentLinkedQueue<() -> Unit>() | ||
|
||
/** | ||
* All top-level formula interactions that trigger formula side-effects are posted here | ||
* to make sure that they are executed one at a time. If there is a thread currently running | ||
* formula, we hand the update to that thread for processing. The following | ||
* root formula events are propagated via this queue: | ||
* - Input change | ||
* - Individual formula transitions | ||
* - Termination | ||
* | ||
* Implementation works by having a concurrent queue and checking: | ||
* - If queue is idle, execute current update and try to process other queue entries | ||
* - If queue is running by the same thread, we execute current update and let other | ||
* updates be handled by existing processing loop. | ||
* - If queue is running by a different thread, add to the queue and see if we need to | ||
* take over the processing. | ||
*/ | ||
fun postUpdate(runnable: () -> Unit) { | ||
val currentThread = Thread.currentThread() | ||
val owner = threadRunning.get() | ||
if (owner == currentThread) { | ||
// Since we are on the same thread, just execute the event (no need to grab ownership) | ||
runnable() | ||
} else if (owner == null) { | ||
if (threadRunning.compareAndSet(null, currentThread)) { | ||
// The queue is idle, we first execute our own event and then move to the queue | ||
runnable() | ||
threadRunning.set(null) | ||
|
||
tryToProcessQueueIfNeeded(currentThread) | ||
} else { | ||
concurrentLinkedQueue.add(runnable) | ||
tryToProcessQueueIfNeeded(currentThread) | ||
} | ||
} else { | ||
concurrentLinkedQueue.add(runnable) | ||
tryToProcessQueueIfNeeded(currentThread) | ||
} | ||
} | ||
|
||
private fun tryToProcessQueueIfNeeded(currentThread: Thread) { | ||
while (true) { | ||
// First, we peek to see if there is a value to process. | ||
val peekUpdate = concurrentLinkedQueue.peek() | ||
if (peekUpdate != null) { | ||
// If there is a value to process, we check if we should process it. | ||
if (threadRunning.compareAndSet(null, currentThread)) { | ||
// We successfully set ourselves as the running thread | ||
// We poll the queue to get the latest value (it could have changed). It | ||
// also removes the value from the queue. | ||
val actualUpdate = concurrentLinkedQueue.poll() | ||
actualUpdate?.invoke() | ||
threadRunning.set(null) | ||
} else { | ||
// Some other thread is running, let that thread execute the update. | ||
return | ||
} | ||
} else { | ||
return | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.