Skip to content

Commit

Permalink
Refactoring + adding new tests to increase code coverage. (#351)
Browse files Browse the repository at this point in the history
* Add coverage for RuntimeConfig.

* Formula plugin coverage.

* Miscellaneous coverage improvements.

* Increase runtime coverage.

* More coverage improvements.

* PR feedback.
  • Loading branch information
Laimiux authored Mar 15, 2024
1 parent 66b58e1 commit fe5cce8
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object FlowRuntime {
formula = formula,
onOutput = this::trySendBlocking,
onError = this::close,
config = config,
config = config ?: RuntimeConfig(),
)

input.onEach(runtime::onInput).launchIn(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object RxJavaRuntime {
formula = formula,
onOutput = emitter::onNext,
onError = emitter::onError,
config = config,
config = config ?: RuntimeConfig(),
)

val disposables = CompositeDisposable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ abstract class ActionBuilder<out Input, State>(
*/
abstract fun <Event> events(
action: Action<Event>,
executionType: Transition.ExecutionType? = null,
executionType: Transition.ExecutionType?,
transition: Transition<Input, State, Event>,
)

Expand Down
99 changes: 54 additions & 45 deletions formula/src/main/java/com/instacart/formula/FormulaRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class FormulaRuntime<Input : Any, Output : Any>(
private val formula: IFormula<Input, Output>,
private val onOutput: (Output) -> Unit,
private val onError: (Throwable) -> Unit,
config: RuntimeConfig?,
config: RuntimeConfig,
) : ManagerDelegate, BatchManager.Executor {
private val isValidationEnabled = config?.isValidationEnabled ?: false
private val isValidationEnabled = config.isValidationEnabled
private val inspector = FormulaPlugins.inspector(
type = formula.type(),
local = config?.inspector,
local = config.inspector,
)
private val defaultDispatcher: Dispatcher = config?.defaultDispatcher ?: FormulaPlugins.defaultDispatcher()
private val defaultDispatcher: Dispatcher = config.defaultDispatcher ?: FormulaPlugins.defaultDispatcher()
private val implementation = formula.implementation()
private val synchronizedUpdateQueue = SynchronizedUpdateQueue(
onEmpty = { emitOutputIfNeeded() }
Expand Down Expand Up @@ -67,7 +67,7 @@ class FormulaRuntime<Input : Any, Output : Any>(
/**
* Global transition effect queue which executes side-effects after all formulas are idle.
*/
private var globalEffectQueue = LinkedList<Effect>()
private val globalEffectQueue = LinkedList<Effect>()

/**
* Determines if we are iterating through [globalEffectQueue]. It prevents us from
Expand Down Expand Up @@ -166,12 +166,9 @@ class FormulaRuntime<Input : Any, Output : Any>(
}
}

if (effects.isNotEmpty() || evaluate) {
if (isRunEnabled) {
run(evaluate = evaluate)
} else {
pendingEvaluation = pendingEvaluation || evaluate
}
pendingEvaluation = pendingEvaluation || evaluate
if (isRunEnabled) {
runIfNeeded()
}
}

Expand All @@ -190,61 +187,62 @@ class FormulaRuntime<Input : Any, Output : Any>(
*/
isRunEnabled = true

if (globalEffectQueue.isNotEmpty() || pendingEvaluation) {
val evaluate = pendingEvaluation
pendingEvaluation = false
run(evaluate = evaluate)
}
runIfNeeded()
}
}
}

private fun runIfNeeded() {
if (globalEffectQueue.isNotEmpty() || pendingEvaluation) {
val evaluate = pendingEvaluation
pendingEvaluation = false
run(evaluate = evaluate)
}
}

/**
* Performs the evaluation and execution phases.
*/
private fun run(evaluate: Boolean = true) {
if (isRunning) return

try {
val manager = checkNotNull(manager)
val manager = requireManager()

if (evaluate) {
var shouldRun = true
while (shouldRun) {
val localInputId = inputId
if (!manager.isTerminated()) {
isRunning = true
inspector?.onRunStarted(true)
isRunning = true
inspector?.onRunStarted(true)

val currentInput = checkNotNull(input)
runFormula(manager, currentInput)
isRunning = false
val currentInput = requireInput()
runFormula(manager, currentInput)
isRunning = false

inspector?.onRunFinished()
inspector?.onRunFinished()

/**
* If termination happened during runFormula() execution, let's perform
* termination side-effects here.
*/
if (manager.isTerminated()) {
shouldRun = false
terminateManager(manager)

/**
* If termination happened during runFormula() execution, let's perform
* termination side-effects here.
* If runtime has been terminated, there is nothing else to do.
*/
if (manager.isTerminated()) {
shouldRun = false
terminateManager(manager)

// If runtime has been terminated, we are stopping and do
// not need to do anything else.
if (!isRuntimeTerminated) {
// Terminated manager with input change indicates that formula
// key changed and we are resetting formula state. We need to
// start a new formula manager.
if (localInputId != inputId) {
input?.let(this::startNewManager)
}
}
} else {
shouldRun = localInputId != inputId
if (!isRuntimeTerminated) {
/**
* Terminated manager with non-terminated runtime indicates that
* formula input has triggered a key change and we are resetting
* formula state. We need to start a new formula manager here.
*/
startNewManager(requireInput())
}
} else {
shouldRun = false
shouldRun = localInputId != inputId
}
}
}
Expand All @@ -255,9 +253,10 @@ class FormulaRuntime<Input : Any, Output : Any>(
} catch (e: Throwable) {
isRunning = false

manager?.markAsTerminated()
val manager = requireManager()
manager.markAsTerminated()
onError(e)
manager?.let(this::terminateManager)
manager.let(this::terminateManager)
}
}

Expand Down Expand Up @@ -359,4 +358,14 @@ class FormulaRuntime<Input : Any, Output : Any>(
defaultDispatcher = defaultDispatcher,
)
}

// Visible for testing
internal fun requireInput(): Input {
return checkNotNull(input)
}

// Visible for testing
internal fun requireManager(): FormulaManagerImpl<Input, *, Output> {
return checkNotNull(manager)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ internal class ChildrenManager(
fun prepareForPostEvaluation() {
indexes?.clear()

children?.clearUnrequested {
pendingRemoval = pendingRemoval ?: mutableListOf()
it.markAsTerminated()
pendingRemoval?.add(it)
}
children?.clearUnrequested(this::prepareForTermination)
}

fun terminateChildren(evaluationId: Long): Boolean {
Expand Down Expand Up @@ -95,6 +91,12 @@ internal class ChildrenManager(
}
}

private fun prepareForTermination(it: FormulaManager<*, *>) {
pendingRemoval = pendingRemoval ?: mutableListOf()
it.markAsTerminated()
pendingRemoval?.add(it)
}

private fun <ChildInput, ChildOutput> childFormulaHolder(
key: Any,
formula: IFormula<ChildInput, ChildOutput>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ internal class FormulaManagerImpl<Input, State, Output>(
* each formula output with an identifier value and compare it for validity with
* the global value.
*/
var globalEvaluationId: Long = 0
private var globalEvaluationId: Long = 0

/**
* Determines if we are executing within [run] block. Enables optimizations
Expand Down Expand Up @@ -194,7 +194,6 @@ internal class FormulaManagerImpl<Input, State, Output>(
val snapshot = SnapshotImpl(
input = input,
state = state,
associatedEvaluationId = evaluationId,
listeners = listeners,
delegate = this,
)
Expand All @@ -220,7 +219,7 @@ internal class FormulaManagerImpl<Input, State, Output>(
listeners.prepareForPostEvaluation()
childrenManager?.prepareForPostEvaluation()

snapshot.running = true
snapshot.markRunning()
if (!isValidationEnabled) {
inspector?.onEvaluateFinished(loggingType, newFrame.evaluation.output, evaluated = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ internal class Listeners {
*/
fun prepareForPostEvaluation() {
indexes?.clear()

listeners?.clearUnrequested {
// TODO log that disabled listener was invoked.
it.disable()
}
listeners?.clearUnrequested(this::disableListener)
}

fun disableAll() {
Expand All @@ -50,6 +46,10 @@ internal class Listeners {
listeners?.clear()
}

private fun disableListener(listener: ListenerImpl<*, *, *>) {
listener.disable()
}

/**
* Function which returns next index for a given key. It will
* mutate the [indexes] map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class SingleRequestHolder<T>(val value: T) {

internal typealias SingleRequestMap<Key, Value> = MutableMap<Key, SingleRequestHolder<Value>>

internal inline fun <Value> SingleRequestMap<*, Value>.clearUnrequested(onUnrequested: (Value) -> Unit) {
internal fun <Value> SingleRequestMap<*, Value>.clearUnrequested(onUnrequested: (Value) -> Unit) {
val callbackIterator = this.iterator()
while (callbackIterator.hasNext()) {
val callback = callbackIterator.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,18 @@ import com.instacart.formula.Listener
import com.instacart.formula.Snapshot
import com.instacart.formula.Transition
import com.instacart.formula.TransitionContext
import com.instacart.formula.plugin.Dispatcher
import java.lang.IllegalStateException
import kotlin.reflect.KClass

internal class SnapshotImpl<out Input, State> internal constructor(
override val input: Input,
override val state: State,
private val associatedEvaluationId: Long,
listeners: Listeners,
private val delegate: FormulaManagerImpl<Input, State, *>,
) : FormulaContext<Input, State>(listeners), Snapshot<Input, State>, TransitionContext<Input, State> {

private var scopeKey: Any? = null
var running = false
private var running = false

override val context: FormulaContext<Input, State> = this

Expand Down Expand Up @@ -88,15 +86,6 @@ internal class SnapshotImpl<out Input, State> internal constructor(
}

fun <Event> dispatch(transition: Transition<Input, State, Event>, event: Event) {
if (!running) {
throw IllegalStateException("Transitions are not allowed during evaluation")
}

if (!delegate.isTerminated() && delegate.isEvaluationNeeded(associatedEvaluationId)) {
// We have already transitioned, this should not happen.
throw IllegalStateException("Transition already happened. This is using old event listener: $transition & $event. Transition: $associatedEvaluationId != ${delegate.globalEvaluationId}")
}

val result = transition.toResult(this, event)
if (TransitionUtils.isEmpty(result)) {
return
Expand All @@ -105,6 +94,10 @@ internal class SnapshotImpl<out Input, State> internal constructor(
delegate.handleTransitionResult(event, result)
}

fun markRunning() {
running = true
}

private fun ensureNotRunning() {
if (running) {
throw IllegalStateException("Cannot call this transition after evaluation finished. See https://instacart.github.io/formula/faq/#after-evaluation-finished")
Expand Down
Loading

0 comments on commit fe5cce8

Please sign in to comment.