Skip to content
This repository has been archived by the owner on Jul 18, 2020. It is now read-only.

Commit

Permalink
Merge pull request #57 from getbouncer/awushensky/add_more_java_interop
Browse files Browse the repository at this point in the history
Add more java interoperability
  • Loading branch information
awushensky authored Jul 14, 2020
2 parents bd8c11b + ea84b93 commit d1c82b9
Show file tree
Hide file tree
Showing 7 changed files with 590 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,37 @@ import com.getbouncer.scan.framework.Analyzer
import com.getbouncer.scan.framework.AnalyzerFactory
import com.getbouncer.scan.framework.AnalyzerPool
import com.getbouncer.scan.framework.DEFAULT_ANALYZER_PARALLEL_COUNT
import kotlinx.coroutines.runBlocking

/**
* An implementation of an analyzer that does not use suspending functions. This allows interoperability with java.
*/
abstract class JavaAnalyzer<Input, State, Output> : Analyzer<Input, State, Output> {
override suspend fun analyze(data: Input, state: State): Output = analyzeJava(data, state)
abstract class BlockingAnalyzer<Input, State, Output> : Analyzer<Input, State, Output> {
override suspend fun analyze(data: Input, state: State): Output = analyzeBlocking(data, state)

abstract fun analyzeJava(data: Input, state: State): Output
abstract fun analyzeBlocking(data: Input, state: State): Output
}

/**
* An implementation of an analyzer factory that does not use suspending functions. This allows interoperability with
* java.
*/
abstract class JavaAnalyzerFactory<Output : Analyzer<*, *, *>> : AnalyzerFactory<Output> {
override suspend fun newInstance(): Output? = newInstanceJava()
abstract class BlockingAnalyzerFactory<Output : Analyzer<*, *, *>> : AnalyzerFactory<Output> {
override suspend fun newInstance(): Output? = newInstanceBlocking()

abstract fun newInstanceJava(): Output?
abstract fun newInstanceBlocking(): Output?
}

/**
* An implementation of an analyzer pool factory that does not use suspending functions. This allows interoperability
* with java.
*/
class JavaAnalyzerPoolFactory<DataFrame, State, Output> @JvmOverloads constructor(
private val analyzerFactory: JavaAnalyzerFactory<out Analyzer<DataFrame, State, Output>>,
class BlockingAnalyzerPoolFactory<DataFrame, State, Output> @JvmOverloads constructor(
private val analyzerFactory: AnalyzerFactory<out Analyzer<DataFrame, State, Output>>,
private val desiredAnalyzerCount: Int = DEFAULT_ANALYZER_PARALLEL_COUNT
) {
fun buildAnalyzerPool() = AnalyzerPool(
desiredAnalyzerCount = desiredAnalyzerCount,
analyzers = (0 until desiredAnalyzerCount).mapNotNull { analyzerFactory.newInstanceJava() }
analyzers = (0 until desiredAnalyzerCount).mapNotNull { runBlocking { analyzerFactory.newInstance() } }
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.getbouncer.scan.framework.interop

import com.getbouncer.scan.framework.AggregateResultListener
import com.getbouncer.scan.framework.ResultAggregator
import com.getbouncer.scan.framework.ResultAggregatorConfig
import com.getbouncer.scan.framework.ResultHandler
import com.getbouncer.scan.framework.SavedFrame
import com.getbouncer.scan.framework.StatefulResultHandler
import com.getbouncer.scan.framework.TerminatingResultHandler

/**
* An implementation of a result handler that does not use suspending functions. This allows interoperability with java.
*/
abstract class BlockingResultHandler<Input, Output, Verdict> : ResultHandler<Input, Output, Verdict> {
override suspend fun onResult(result: Output, data: Input) = onResultBlocking(result, data)

abstract fun onResultBlocking(result: Output, data: Input): Verdict
}

/**
* An implementation of a stateful result handler that does not use suspending functions. This allows interoperability
* with java.
*/
abstract class BlockingStatefulResultHandler<Input, State, Output, Verdict>(
initialState: State
) : StatefulResultHandler<Input, State, Output, Verdict>(initialState) {
override suspend fun onResult(result: Output, data: Input): Verdict = onResultBlocking(result, data)

abstract fun onResultBlocking(result: Output, data: Input): Verdict
}

/**
* An implementation of a terminating result handler that does not use suspending functions. This allows
* interoperability with java.
*/
abstract class BlockingTerminatingResultHandler<Input, State, Output>(
initialState: State
) : TerminatingResultHandler<Input, State, Output>(initialState) {
override suspend fun onResult(result: Output, data: Input) = onResultBlocking(result, data)

override suspend fun onTerminatedEarly() = onTerminatedEarlyBlocking()

override suspend fun onAllDataProcessed() = onAllDataProcessedBlocking()

abstract fun onResultBlocking(result: Output, data: Input)

abstract fun onTerminatedEarlyBlocking()

abstract fun onAllDataProcessedBlocking()
}

/**
* An implementation of a result listener that does not use suspending functions. This allows interoperability with
* java.
*/
abstract class BlockingAggregateResultListener<DataFrame, State, InterimResult, FinalResult> :
AggregateResultListener<DataFrame, State, InterimResult, FinalResult> {
override suspend fun onInterimResult(result: InterimResult, state: State, frame: DataFrame) =
onInterimResultBlocking(result, state, frame)

override suspend fun onResult(
result: FinalResult,
frames: Map<String, List<SavedFrame<DataFrame, State, InterimResult>>>
) = onResultBlocking(result, frames)

override suspend fun onReset() = onResetBlocking()

abstract fun onInterimResultBlocking(result: InterimResult, state: State, frame: DataFrame)

abstract fun onResultBlocking(
result: FinalResult,
frames: Map<String, List<SavedFrame<DataFrame, State, InterimResult>>>
)

abstract fun onResetBlocking()
}

/**
* An implementation of a result aggregator that does not use suspending functions. This allows interoperability with
* java.
*/
abstract class BlockingResultAggregator<DataFrame, State, AnalyzerResult, InterimResult, FinalResult>(
config: ResultAggregatorConfig,
listener: AggregateResultListener<DataFrame, State, InterimResult, FinalResult>,
initialState: State
) : ResultAggregator<DataFrame, State, AnalyzerResult, InterimResult, FinalResult>(config, listener, initialState) {
override suspend fun aggregateResult(
result: AnalyzerResult,
startAggregationTimer: () -> Unit,
mustReturnFinal: Boolean
): Pair<InterimResult, FinalResult?> = aggregateResultBlocking(result, startAggregationTimer, mustReturnFinal)

abstract fun aggregateResultBlocking(
result: AnalyzerResult,
startAggregationTimer: () -> Unit,
mustReturnFinal: Boolean
): Pair<InterimResult, FinalResult?>
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ abstract class JavaContinuation<in T> @JvmOverloads constructor(
runOn: CoroutineContext = Dispatchers.Default,
private val listenOn: CoroutineContext = Dispatchers.Main
) : Continuation<T> {
override val context: CoroutineContext = runOn
abstract fun onComplete(value: T)
abstract fun onException(exception: Throwable)
override fun resumeWith(result: Result<T>) = result.fold(
Expand All @@ -27,5 +28,4 @@ abstract class JavaContinuation<in T> @JvmOverloads constructor(
}
}
)
override val context: CoroutineContext = runOn
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ sealed class Timer {
}
}

fun <T> measure(taskName: String? = null, task: () -> T): T = runBlocking {
measureSuspend { task() }
}
fun <T> measure(taskName: String? = null, task: () -> T): T = runBlocking { measureSuspend(taskName) { task() } }

abstract suspend fun <T> measureSuspend(taskName: String? = null, task: suspend () -> T): T
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.getbouncer.scan.framework.interop;

import androidx.test.filters.MediumTest;

import com.getbouncer.scan.framework.Analyzer;
import com.getbouncer.scan.framework.AnalyzerFactory;
import com.getbouncer.scan.framework.AnalyzerPool;
import com.getbouncer.scan.framework.AnalyzerPoolFactory;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Test;

import kotlin.coroutines.Continuation;
import kotlin.jvm.functions.Function2;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.Deferred;
import kotlinx.coroutines.Dispatchers;
import kotlinx.coroutines.GlobalScope;

public class BlockingAnalyzerTest {

@Test(timeout = 1000)
@MediumTest
public void blockingAnalyzer_works() throws InterruptedException {
final Analyzer<Integer, Boolean, Boolean> analyzer = new BlockingAnalyzer<Integer, Boolean, Boolean>() {
@NotNull
@Override
public String getName() {
return "test_blocking_analyzer";
}

@Override
public Boolean analyzeBlocking(Integer data, Boolean state) {
return data > 0 && state;
}
};

final Deferred<Boolean> deferred = BuildersKt.async(
GlobalScope.INSTANCE,
Dispatchers.getDefault(),
CoroutineStart.DEFAULT,
new Function2<CoroutineScope, Continuation<? super Boolean>, Object>() {
@Override
public Object invoke(CoroutineScope coroutineScope, Continuation<? super Boolean> continuation) {
return analyzer.analyze(1, true, continuation);
}
}
);

while (!deferred.isCompleted()) {
Thread.sleep(100);
}

Assert.assertTrue(deferred.getCompleted());
}

@Test(timeout = 1000)
@MediumTest
public void blockingAnalyzerFactory_works() throws InterruptedException {
final AnalyzerFactory<Analyzer<Integer, Boolean, Boolean>> factory =
new BlockingAnalyzerFactory<Analyzer<Integer, Boolean, Boolean>>() {
@Nullable
@Override
public Analyzer<Integer, Boolean, Boolean> newInstanceBlocking() {
return new Analyzer<Integer, Boolean, Boolean>() {
@NotNull
@Override
public String getName() {
return "test_analyzer";
}

@Nullable
@Override
public Object analyze(
Integer data,
Boolean state,
@NotNull Continuation<? super Boolean> $completion
) {
return null;
}
};
}
};

final Deferred<Analyzer<Integer, Boolean, Boolean>> deferred = BuildersKt.async(
GlobalScope.INSTANCE,
Dispatchers.getDefault(),
CoroutineStart.DEFAULT,
new Function2<CoroutineScope, Continuation<? super Analyzer<Integer, Boolean, Boolean>>, Object>() {
@Override
public Object invoke(
CoroutineScope coroutineScope,
Continuation<? super Analyzer<Integer, Boolean, Boolean>> continuation
) {
return factory.newInstance(continuation);
}
}
);

while (!deferred.isCompleted()) {
Thread.sleep(100);
}

Assert.assertNotNull(deferred.getCompleted());
}

@Test
@MediumTest
public void blockingAnalyzerPoolFactory_works() throws InterruptedException {
final AnalyzerFactory<Analyzer<Integer, Boolean, Boolean>> factory =
new BlockingAnalyzerFactory<Analyzer<Integer, Boolean, Boolean>>() {
@Nullable
@Override
public Analyzer<Integer, Boolean, Boolean> newInstanceBlocking() {
return new Analyzer<Integer, Boolean, Boolean>() {
@NotNull
@Override
public String getName() {
return "test_analyzer";
}

@Nullable
@Override
public Object analyze(
Integer data,
Boolean state,
@NotNull Continuation<? super Boolean> $completion
) {
return null;
}
};
}
};

final BlockingAnalyzerPoolFactory<Integer, Boolean, Boolean> poolFactory =
new BlockingAnalyzerPoolFactory<>(factory);

Assert.assertNotNull(poolFactory.buildAnalyzerPool());
}
}
Loading

0 comments on commit d1c82b9

Please sign in to comment.