diff --git a/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaAnalyzer.kt b/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/BlockingAnalyzer.kt similarity index 59% rename from scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaAnalyzer.kt rename to scan-framework/src/main/java/com/getbouncer/scan/framework/interop/BlockingAnalyzer.kt index 42bad9b..e165bb6 100644 --- a/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaAnalyzer.kt +++ b/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/BlockingAnalyzer.kt @@ -4,36 +4,37 @@ import com.getbouncer.scan.framework.Analyzer import com.getbouncer.scan.framework.AnalyzerFactory import com.getbouncer.scan.framework.AnalyzerPool import com.getbouncer.scan.framework.DEFAULT_ANALYZER_PARALLEL_COUNT +import kotlinx.coroutines.runBlocking /** * An implementation of an analyzer that does not use suspending functions. This allows interoperability with java. */ -abstract class JavaAnalyzer : Analyzer { - override suspend fun analyze(data: Input, state: State): Output = analyzeJava(data, state) +abstract class BlockingAnalyzer : Analyzer { + override suspend fun analyze(data: Input, state: State): Output = analyzeBlocking(data, state) - abstract fun analyzeJava(data: Input, state: State): Output + abstract fun analyzeBlocking(data: Input, state: State): Output } /** * An implementation of an analyzer factory that does not use suspending functions. This allows interoperability with * java. */ -abstract class JavaAnalyzerFactory> : AnalyzerFactory { - override suspend fun newInstance(): Output? = newInstanceJava() +abstract class BlockingAnalyzerFactory> : AnalyzerFactory { + override suspend fun newInstance(): Output? = newInstanceBlocking() - abstract fun newInstanceJava(): Output? + abstract fun newInstanceBlocking(): Output? } /** * An implementation of an analyzer pool factory that does not use suspending functions. This allows interoperability * with java. */ -class JavaAnalyzerPoolFactory @JvmOverloads constructor( - private val analyzerFactory: JavaAnalyzerFactory>, +class BlockingAnalyzerPoolFactory @JvmOverloads constructor( + private val analyzerFactory: AnalyzerFactory>, private val desiredAnalyzerCount: Int = DEFAULT_ANALYZER_PARALLEL_COUNT ) { fun buildAnalyzerPool() = AnalyzerPool( desiredAnalyzerCount = desiredAnalyzerCount, - analyzers = (0 until desiredAnalyzerCount).mapNotNull { analyzerFactory.newInstanceJava() } + analyzers = (0 until desiredAnalyzerCount).mapNotNull { runBlocking { analyzerFactory.newInstance() } } ) } diff --git a/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/BlockingResult.kt b/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/BlockingResult.kt new file mode 100644 index 0000000..891e657 --- /dev/null +++ b/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/BlockingResult.kt @@ -0,0 +1,98 @@ +package com.getbouncer.scan.framework.interop + +import com.getbouncer.scan.framework.AggregateResultListener +import com.getbouncer.scan.framework.ResultAggregator +import com.getbouncer.scan.framework.ResultAggregatorConfig +import com.getbouncer.scan.framework.ResultHandler +import com.getbouncer.scan.framework.SavedFrame +import com.getbouncer.scan.framework.StatefulResultHandler +import com.getbouncer.scan.framework.TerminatingResultHandler + +/** + * An implementation of a result handler that does not use suspending functions. This allows interoperability with java. + */ +abstract class BlockingResultHandler : ResultHandler { + override suspend fun onResult(result: Output, data: Input) = onResultBlocking(result, data) + + abstract fun onResultBlocking(result: Output, data: Input): Verdict +} + +/** + * An implementation of a stateful result handler that does not use suspending functions. This allows interoperability + * with java. + */ +abstract class BlockingStatefulResultHandler( + initialState: State +) : StatefulResultHandler(initialState) { + override suspend fun onResult(result: Output, data: Input): Verdict = onResultBlocking(result, data) + + abstract fun onResultBlocking(result: Output, data: Input): Verdict +} + +/** + * An implementation of a terminating result handler that does not use suspending functions. This allows + * interoperability with java. + */ +abstract class BlockingTerminatingResultHandler( + initialState: State +) : TerminatingResultHandler(initialState) { + override suspend fun onResult(result: Output, data: Input) = onResultBlocking(result, data) + + override suspend fun onTerminatedEarly() = onTerminatedEarlyBlocking() + + override suspend fun onAllDataProcessed() = onAllDataProcessedBlocking() + + abstract fun onResultBlocking(result: Output, data: Input) + + abstract fun onTerminatedEarlyBlocking() + + abstract fun onAllDataProcessedBlocking() +} + +/** + * An implementation of a result listener that does not use suspending functions. This allows interoperability with + * java. + */ +abstract class BlockingAggregateResultListener : + AggregateResultListener { + override suspend fun onInterimResult(result: InterimResult, state: State, frame: DataFrame) = + onInterimResultBlocking(result, state, frame) + + override suspend fun onResult( + result: FinalResult, + frames: Map>> + ) = onResultBlocking(result, frames) + + override suspend fun onReset() = onResetBlocking() + + abstract fun onInterimResultBlocking(result: InterimResult, state: State, frame: DataFrame) + + abstract fun onResultBlocking( + result: FinalResult, + frames: Map>> + ) + + abstract fun onResetBlocking() +} + +/** + * An implementation of a result aggregator that does not use suspending functions. This allows interoperability with + * java. + */ +abstract class BlockingResultAggregator( + config: ResultAggregatorConfig, + listener: AggregateResultListener, + initialState: State +) : ResultAggregator(config, listener, initialState) { + override suspend fun aggregateResult( + result: AnalyzerResult, + startAggregationTimer: () -> Unit, + mustReturnFinal: Boolean + ): Pair = aggregateResultBlocking(result, startAggregationTimer, mustReturnFinal) + + abstract fun aggregateResultBlocking( + result: AnalyzerResult, + startAggregationTimer: () -> Unit, + mustReturnFinal: Boolean + ): Pair +} diff --git a/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaContinuation.kt b/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaContinuation.kt index 3a87101..827ab73 100644 --- a/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaContinuation.kt +++ b/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaContinuation.kt @@ -13,6 +13,7 @@ abstract class JavaContinuation @JvmOverloads constructor( runOn: CoroutineContext = Dispatchers.Default, private val listenOn: CoroutineContext = Dispatchers.Main ) : Continuation { + override val context: CoroutineContext = runOn abstract fun onComplete(value: T) abstract fun onException(exception: Throwable) override fun resumeWith(result: Result) = result.fold( @@ -27,5 +28,4 @@ abstract class JavaContinuation @JvmOverloads constructor( } } ) - override val context: CoroutineContext = runOn } diff --git a/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaResult.kt b/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaResult.kt deleted file mode 100644 index 25fee15..0000000 --- a/scan-framework/src/main/java/com/getbouncer/scan/framework/interop/JavaResult.kt +++ /dev/null @@ -1,38 +0,0 @@ -package com.getbouncer.scan.framework.interop - -import com.getbouncer.scan.framework.ResultHandler -import com.getbouncer.scan.framework.StatefulResultHandler -import com.getbouncer.scan.framework.TerminatingResultHandler - -/** - * An implementation of a result handler that does not use suspending functions. This allows interoperability with java. - */ -abstract class JavaResultHandler : ResultHandler { - override suspend fun onResult(result: Output, data: Input) = onResultJava(result, data) - - abstract fun onResultJava(result: Output, data: Input): Verdict -} - -/** - * An implementation of a stateful result handler that does not use suspending functions. This allows interoperability - * with java. - */ -abstract class JavaStatefulResultHandler( - initialState: State -) : StatefulResultHandler(initialState) { - override suspend fun onResult(result: Output, data: Input): Verdict = onResultJava(result, data) - - abstract fun onResultJava(result: Output, data: Input): Verdict -} - -/** - * An implementation of a terminating result handler that does not use suspending functions. This allows - * interoperability with java. - */ -abstract class JavaTerminatingResultHandler( - initialState: State -) : TerminatingResultHandler(initialState) { - override suspend fun onResult(result: Output, data: Input) = onResultJava(result, data) - - abstract fun onResultJava(result: Output, data: Input) -} diff --git a/scan-framework/src/main/java/com/getbouncer/scan/framework/time/Timer.kt b/scan-framework/src/main/java/com/getbouncer/scan/framework/time/Timer.kt index 1f390e2..3193551 100644 --- a/scan-framework/src/main/java/com/getbouncer/scan/framework/time/Timer.kt +++ b/scan-framework/src/main/java/com/getbouncer/scan/framework/time/Timer.kt @@ -23,9 +23,7 @@ sealed class Timer { } } - fun measure(taskName: String? = null, task: () -> T): T = runBlocking { - measureSuspend { task() } - } + fun measure(taskName: String? = null, task: () -> T): T = runBlocking { measureSuspend(taskName) { task() } } abstract suspend fun measureSuspend(taskName: String? = null, task: suspend () -> T): T } diff --git a/scan-framework/src/test/java/com/getbouncer/scan/framework/interop/BlockingAnalyzerTest.java b/scan-framework/src/test/java/com/getbouncer/scan/framework/interop/BlockingAnalyzerTest.java new file mode 100644 index 0000000..55e147c --- /dev/null +++ b/scan-framework/src/test/java/com/getbouncer/scan/framework/interop/BlockingAnalyzerTest.java @@ -0,0 +1,144 @@ +package com.getbouncer.scan.framework.interop; + +import androidx.test.filters.MediumTest; + +import com.getbouncer.scan.framework.Analyzer; +import com.getbouncer.scan.framework.AnalyzerFactory; +import com.getbouncer.scan.framework.AnalyzerPool; +import com.getbouncer.scan.framework.AnalyzerPoolFactory; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.Assert; +import org.junit.Test; + +import kotlin.coroutines.Continuation; +import kotlin.jvm.functions.Function2; +import kotlinx.coroutines.BuildersKt; +import kotlinx.coroutines.CoroutineScope; +import kotlinx.coroutines.CoroutineStart; +import kotlinx.coroutines.Deferred; +import kotlinx.coroutines.Dispatchers; +import kotlinx.coroutines.GlobalScope; + +public class BlockingAnalyzerTest { + + @Test(timeout = 1000) + @MediumTest + public void blockingAnalyzer_works() throws InterruptedException { + final Analyzer analyzer = new BlockingAnalyzer() { + @NotNull + @Override + public String getName() { + return "test_blocking_analyzer"; + } + + @Override + public Boolean analyzeBlocking(Integer data, Boolean state) { + return data > 0 && state; + } + }; + + final Deferred deferred = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return analyzer.analyze(1, true, continuation); + } + } + ); + + while (!deferred.isCompleted()) { + Thread.sleep(100); + } + + Assert.assertTrue(deferred.getCompleted()); + } + + @Test(timeout = 1000) + @MediumTest + public void blockingAnalyzerFactory_works() throws InterruptedException { + final AnalyzerFactory> factory = + new BlockingAnalyzerFactory>() { + @Nullable + @Override + public Analyzer newInstanceBlocking() { + return new Analyzer() { + @NotNull + @Override + public String getName() { + return "test_analyzer"; + } + + @Nullable + @Override + public Object analyze( + Integer data, + Boolean state, + @NotNull Continuation $completion + ) { + return null; + } + }; + } + }; + + final Deferred> deferred = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2>, Object>() { + @Override + public Object invoke( + CoroutineScope coroutineScope, + Continuation> continuation + ) { + return factory.newInstance(continuation); + } + } + ); + + while (!deferred.isCompleted()) { + Thread.sleep(100); + } + + Assert.assertNotNull(deferred.getCompleted()); + } + + @Test + @MediumTest + public void blockingAnalyzerPoolFactory_works() throws InterruptedException { + final AnalyzerFactory> factory = + new BlockingAnalyzerFactory>() { + @Nullable + @Override + public Analyzer newInstanceBlocking() { + return new Analyzer() { + @NotNull + @Override + public String getName() { + return "test_analyzer"; + } + + @Nullable + @Override + public Object analyze( + Integer data, + Boolean state, + @NotNull Continuation $completion + ) { + return null; + } + }; + } + }; + + final BlockingAnalyzerPoolFactory poolFactory = + new BlockingAnalyzerPoolFactory<>(factory); + + Assert.assertNotNull(poolFactory.buildAnalyzerPool()); + } +} diff --git a/scan-framework/src/test/java/com/getbouncer/scan/framework/interop/BlockingResultTest.java b/scan-framework/src/test/java/com/getbouncer/scan/framework/interop/BlockingResultTest.java new file mode 100644 index 0000000..a4f59e2 --- /dev/null +++ b/scan-framework/src/test/java/com/getbouncer/scan/framework/interop/BlockingResultTest.java @@ -0,0 +1,336 @@ +package com.getbouncer.scan.framework.interop; + +import androidx.test.filters.MediumTest; + +import com.getbouncer.scan.framework.AggregateResultListener; +import com.getbouncer.scan.framework.ResultAggregator; +import com.getbouncer.scan.framework.ResultAggregatorConfig; +import com.getbouncer.scan.framework.ResultHandler; +import com.getbouncer.scan.framework.SavedFrame; +import com.getbouncer.scan.framework.StatefulResultHandler; +import com.getbouncer.scan.framework.TerminatingResultHandler; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import kotlin.Pair; +import kotlin.Unit; +import kotlin.coroutines.Continuation; +import kotlin.jvm.functions.Function0; +import kotlin.jvm.functions.Function2; +import kotlinx.coroutines.BuildersKt; +import kotlinx.coroutines.CoroutineScope; +import kotlinx.coroutines.CoroutineStart; +import kotlinx.coroutines.Deferred; +import kotlinx.coroutines.Dispatchers; +import kotlinx.coroutines.GlobalScope; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class BlockingResultTest { + + private static class TerminatingTestResult { + public boolean handledResult = false; + public boolean handledAllResults = false; + public boolean terminatedEarly = false; + } + + private static class AggregateTestResult { + public boolean handledInterim = false; + public boolean handledFinal = false; + public boolean handledReset = false; + } + + @Test(timeout = 1000) + @MediumTest + public void blockingResultHandler_works() throws InterruptedException { + final ResultHandler resultHandler = + new BlockingResultHandler() { + @Override + public Boolean onResultBlocking(Integer result, Integer data) { + return result != null && result.equals(data); + } + }; + + final Deferred deferred = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return resultHandler.onResult(2, 2, continuation); + } + } + ); + + while (!deferred.isCompleted()) { + Thread.sleep(100); + } + + assertTrue(deferred.getCompleted()); + } + + @Test(timeout = 1000) + @MediumTest + public void blockingStatefulResultHandler_works() throws InterruptedException { + final StatefulResultHandler resultHandler = + new BlockingStatefulResultHandler(true) { + @Override + public Boolean onResultBlocking(Integer result, Integer data) { + return result != null && result.equals(data) && getState(); + } + }; + + final Deferred deferred = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return resultHandler.onResult(2, 2, continuation); + } + } + ); + + while (!deferred.isCompleted()) { + Thread.sleep(100); + } + + assertTrue(deferred.getCompleted()); + } + + @Test(timeout = 1000) + @MediumTest + public void blockingTerminatingResultHandler_works() throws InterruptedException { + final TerminatingTestResult testResult = new TerminatingTestResult(); + + final TerminatingResultHandler resultHandler = + new BlockingTerminatingResultHandler(true) { + @Override + public void onAllDataProcessedBlocking() { + testResult.handledAllResults = true; + } + + @Override + public void onTerminatedEarlyBlocking() { + testResult.terminatedEarly = true; + } + + @Override + public void onResultBlocking(Integer result, Integer data) { + testResult.handledResult = true; + } + }; + + final Deferred ranAllDataProcessed = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return resultHandler.onAllDataProcessed(continuation); + } + } + ); + + final Deferred ranTerminatedEarly = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return resultHandler.onTerminatedEarly(continuation); + } + } + ); + + final Deferred ranResult = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return resultHandler.onResult(2, 2, continuation); + } + } + ); + + while (!testResult.handledResult || + !testResult.handledAllResults || + !testResult.terminatedEarly || + !ranAllDataProcessed.isCompleted() || + !ranTerminatedEarly.isCompleted() || + !ranResult.isCompleted() + ) { + Thread.sleep(100); + } + } + + @Test(timeout = 1000L) + @MediumTest + public void blockingAggregateResultListener_works() throws InterruptedException { + final AggregateTestResult testResult = new AggregateTestResult(); + + final AggregateResultListener resultListener = + new BlockingAggregateResultListener() { + @Override + public void onInterimResultBlocking(Integer result, Boolean state, Integer frame) { + testResult.handledInterim = true; + } + + @Override + public void onResultBlocking( + Boolean result, + @NotNull Map>> frames + ) { + testResult.handledFinal = true; + } + + @Override + public void onResetBlocking() { + testResult.handledReset = true; + } + }; + + final Deferred deferredInterim = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return resultListener.onInterimResult(1, true, 2, continuation); + } + } + ); + + final Deferred deferredResult = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return resultListener.onResult( + true, + Collections.>>emptyMap(), + continuation + ); + } + } + ); + + final Deferred deferredReset = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation continuation) { + return resultListener.onReset(continuation); + } + } + ); + + while (!testResult.handledInterim || + !testResult.handledFinal || + !testResult.handledReset || + !deferredInterim.isCompleted() || + !deferredResult.isCompleted() || + !deferredReset.isCompleted() + ) { + Thread.sleep(100); + } + } + + @Test(timeout = 1000L) + @MediumTest + public void blockingResultAggregator_works() throws InterruptedException { + final ResultAggregatorConfig config = new ResultAggregatorConfig.Builder().build(); + + final AggregateResultListener listener = + new BlockingAggregateResultListener() { + @Override + public void onInterimResultBlocking(Integer result, Boolean state, Integer frame) { } + + @Override + public void onResultBlocking( + Boolean result, + @NotNull Map>> frames + ) { } + + @Override + public void onResetBlocking() { } + }; + + final ResultAggregator resultAggregator = + new BlockingResultAggregator(config, listener, true) { + @Override + public int getFrameSizeBytes(Integer frame) { + return 0; + } + + @Nullable + @Override + public String getSaveFrameIdentifier(Integer result, Integer frame) { + return null; + } + + @NotNull + @Override + public Pair aggregateResultBlocking( + Integer result, + @NotNull Function0 startAggregationTimer, + boolean mustReturnFinal + ) { + return new Pair<>(1, true); + } + + @NotNull + @Override + protected String getName() { + return "test_blocking_result_aggregator"; + } + }; + + final Deferred> deferred = BuildersKt.async( + GlobalScope.INSTANCE, + Dispatchers.getDefault(), + CoroutineStart.DEFAULT, + new Function2>, Object>() { + @Override + public Object invoke(CoroutineScope coroutineScope, Continuation> continuation) { + return resultAggregator.aggregateResult( + 1, + new Function0() { + @Override + public Unit invoke() { + return Unit.INSTANCE; + } + }, + true, + continuation + ); + } + } + ); + + while (!deferred.isCompleted()) { + Thread.sleep(100); + } + + assertEquals(1, (int) deferred.getCompleted().getFirst()); + assertTrue(deferred.getCompleted().getSecond()); + } +}