Skip to content

Commit

Permalink
Merge pull request #832 from simple-robot/pref-jvm-blocking-runner
Browse files Browse the repository at this point in the history
优化 `BlockingRunner` 内部实现
  • Loading branch information
ForteScarlet authored May 29, 2024
2 parents 7feb3f8 + c1cba91 commit 65a846b
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ private inline fun initDispatcher(
dispatcher eq DISPATCHER_USE_CUSTOM_PROPERTY_VALUE -> loadCustomBlockingDispatcher(
Thread.currentThread().contextClassLoader
)

else -> null
}

Expand Down Expand Up @@ -563,12 +564,14 @@ private var runInNoScopeBlockingStrategy: RunInNoScopeBlockingStrategy = Default

@OptIn(ExperimentalSimbotAPI::class)
private object DefaultRunInNoScopeBlockingStrategy : RunInNoScopeBlockingStrategy {
@kotlin.jvm.Throws(Exception::class)
override fun <T> invoke(context: CoroutineContext, block: suspend () -> T): T {
val runner = SuspendRunner<T>(context)
block.startCoroutine(runner)
return runner.await(SuspendRunner.isWaitTimeoutEnabled)
}

@kotlin.jvm.Throws(Exception::class)
fun <T> invokeWithoutTimeoutLog(context: CoroutineContext, block: suspend () -> T): T {
val runner = SuspendRunner<T>(context)
block.startCoroutine(runner)
Expand All @@ -595,31 +598,31 @@ public fun setRunInNoScopeBlockingStrategy(strategy: RunInNoScopeBlockingStrateg
*
* 在默认未提供上下文的情况下,[runInBlocking] 所使用的 [context] 为 [DefaultBlockingContext].
*
* @throws RunInBlockingException [block] 中产生的所有异常(除了 [TimeoutCancellationException])均会被包装在 [RunInBlockingException] 的 `cause` 中被抛出。
* @throws Exception 原函数可能被抛出的任何异常
* @throws RunInBlockingException 当出现执行 [block] 过程中由于 future 或线程中断等非 [block] 本身产生的异常时被包装为 [RunInBlockingException]
*
* @see DefaultBlockingContext
* @see runBlocking
*/
@OptIn(ExperimentalSimbotAPI::class, InternalSimbotAPI::class)
@Throws(RunInBlockingException::class)
@Throws(Exception::class)
public fun <T> runInBlocking(
context: CoroutineContext = DefaultBlockingContext,
block: suspend CoroutineScope.() -> T,
): T = runCatching {
runInBlockingStrategy(context, block)
}.getOrElse { throw `$RunInBlockingException$`(it) }
): T = runInBlockingStrategy(context, block)

/**
* 如果超时,则抛出 [TimeoutCancellationException].
*
* @throws TimeoutCancellationException 如果超时
* @throws RunInBlockingException [block] 中产生的所有异常(除了 [TimeoutCancellationException])均会被包装在 [RunInBlockingException] 的 `cause` 中被抛出。
* @throws Exception 原函数可能被抛出的任何异常
* @throws RunInBlockingException 当出现执行 [block] 过程中由于 future 或线程中断等非 [block] 本身产生的异常时被包装为 [RunInBlockingException]
*
* @see runInBlocking
* @see withTimeout
*/
@OptIn(ExperimentalSimbotAPI::class, InternalSimbotAPI::class)
@Throws(RunInBlockingException::class)
@Throws(Exception::class)
public fun <T> runInTimeoutBlocking(
timeout: Long,
context: CoroutineContext = DefaultBlockingContext,
Expand All @@ -640,46 +643,41 @@ public fun <T> runInTimeoutBlocking(
*
* 在默认未提供上下文的情况下,[runInBlocking] 所使用的 [context] 为 [DefaultBlockingContext].
*
* @throws RunInBlockingException [block] 中产生的所有异常均会被包装在 [RunInBlockingException] 的 `cause` 中被抛出。
*
* @throws Exception 原函数可能被抛出的任何异常
* @throws RunInBlockingException 当出现执行 [block] 过程中由于 future 或线程中断等非 [block] 本身产生的异常时被包装为 [RunInBlockingException]
* @see DefaultBlockingContext
* @see runBlocking
*/
@OptIn(ExperimentalSimbotAPI::class, InternalSimbotAPI::class)
@Throws(RunInBlockingException::class)
@Throws(Exception::class)
public fun <T> runInNoScopeBlocking(
context: CoroutineContext = DefaultBlockingContext,
block: suspend () -> T,
): T = runCatching {
runInNoScopeBlockingStrategy(context, block)
}.getOrElse { throw `$RunInBlockingException$`(it) }
): T = runInNoScopeBlockingStrategy(context, block)

/**
* @suppress 内部API
*
* @throws RunInBlockingException [block] 中产生的所有异常均会被包装在 [RunInBlockingException] 的 `cause` 中被抛出。
* @throws Exception 原函数可能被抛出的任何异常
* @throws RunInBlockingException 当出现执行 [block] 过程中由于 future 或线程中断等非 [block] 本身产生的异常时被包装为 [RunInBlockingException]
*
* @see runInNoScopeBlocking
* @see DefaultBlockingContext
* @see runBlocking
*/
@OptIn(ExperimentalSimbotAPI::class)
@Throws(RunInBlockingException::class)
@Throws(Exception::class)
@InternalSimbotAPI
public fun <T> runInNoScopeBlockingWithoutTimeoutDebug(
context: CoroutineContext = DefaultBlockingContext,
block: suspend () -> T,
): T {
try {
val strategy = runInNoScopeBlockingStrategy
if (strategy is DefaultRunInNoScopeBlockingStrategy) {
return DefaultRunInNoScopeBlockingStrategy.invokeWithoutTimeoutLog(context, block)
}

return strategy(context, block)
} catch (e: Throwable) {
throw `$RunInBlockingException$`(e)
val strategy = runInNoScopeBlockingStrategy
if (strategy is DefaultRunInNoScopeBlockingStrategy) {
return DefaultRunInNoScopeBlockingStrategy.invokeWithoutTimeoutLog(context, block)
}

return strategy(context, block)
}

/**
Expand Down Expand Up @@ -746,7 +744,7 @@ public fun <T> `$$asReserve`(scope: CoroutineScope? = null, block: suspend () ->

@InternalSimbotAPI
@Deprecated("Just used by compiler", level = DeprecationLevel.HIDDEN)
@Throws(RunInBlockingException::class)
@Throws(Exception::class)
public fun <T> `$$runInBlocking`(block: suspend () -> T): T = runInNoScopeBlocking(block = block)


Expand Down Expand Up @@ -784,38 +782,51 @@ public fun <T> `$$runInAsyncNullable`(block: suspend () -> T, scope: CoroutineSc

/**
* 使用在 `runBlocking` 或相关函数中,用于将运行其中的函数所抛出的函数捕获并包装。
* 内容函数抛出的真正异常在 [cause] 中。
*
* [RunInBlockingException] 只会包装那些由 future 或者线程中断导致的异常,
* 实际执行的blocking函数所抛出的异常会被原样抛出。
*
* 通常来讲,[cause] 可能是:
* - [CancellationException]
* - [ExecutionException] (概率很低)
* - [InterruptedException]
*/
public sealed class RunInBlockingException protected constructor(cause: Throwable) : RuntimeException(cause)


@Suppress("unused", "ClassName")
private class `$RunInBlockingException$`(cause: Throwable) : RunInBlockingException(cause)

private fun throwRunInBlockingException(cause: Throwable): Nothing =
throw `$RunInBlockingException$`(cause)

// Yes. I am the BlockingRunner.
private class SuspendRunner<T>(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation<T> {
@Suppress("unused")
@Volatile
var s: Int = 0
var s: Int = SIGNAL_NONE
// 1 = resume - success
// 2 = resume - exception
// 3 = suspend

@Volatile
var value: Any? = null
// 1 -> value
// 2 -> ex
// 3 -> CompletableFuture<T>
@Volatile
var value: Any? = null

private object NULL

override fun resumeWith(result: Result<T>) {
// 先变更信号
val resumed =
signalUpdater.compareAndSet(
this,
SIGNAL_NONE,
if (result.isSuccess) SIGNAL_RESUME_SUCCESS else SIGNAL_RESUME_FAILED
)

// 信号变更失败,说明现在信号是 SUSPEND
// 那么 value 就已经有一个 future 值了,或者它应该被初始化为一个 future
if (!resumed) {
// value is a Future.
@Suppress("UNCHECKED_CAST")
Expand All @@ -839,12 +850,14 @@ private class SuspendRunner<T>(override val context: CoroutineContext = EmptyCor
}

/**
* @param isWaitTimeoutEnabled 是否输出长时间阻塞警告
*
* @see CompletableFuture.join
* @see CompletableFuture.get
* @throws CancellationException cancellation
* @throws CompletionException completion
* @throws ExecutionException if [ExecutionException.cause] is null
* @throws Exception [ExecutionException.cause]
* @throws InterruptedException
* @throws RunInBlockingException
*/
@Suppress("UNCHECKED_CAST", "ReturnCount", "ThrowsCount")
@Throws(Exception::class)
Expand Down Expand Up @@ -875,29 +888,36 @@ private class SuspendRunner<T>(override val context: CoroutineContext = EmptyCor
throw value as Throwable
}

// 不需要检测长时间等待的日志,
// 或者不需要在virtual时输出日志,且当前是virtual thread
if (!isWaitTimeoutEnabled || (!logIfVirtual && Thread.currentThread().isVirtualThread())) {
try {
return future.get()
} catch (cancellation: CancellationException) {
throw cancellation
throwRunInBlockingException(cancellation)
} catch (execution: ExecutionException) {
throw execution.cause ?: execution
// 一般来讲 ExecutionException.cause 不会是 null
throw execution.cause ?: throwRunInBlockingException(execution)
}
// catch (other: Throwable) {
// throw other // CompletionException(other)
// }
// InterruptedException 直接向外传递
}

// 需要输出长时间等待日志

var times = 0
while (!future.isDone && !Thread.currentThread().isInterrupted) {
while (!future.isDone) {
if (Thread.interrupted()) {
throw InterruptedException()
}

if (times > 0) {
val duration = (waitTimeout * times).milliseconds
if (logger.isDebugEnabled) {
val durationString = duration.toString()
logger.warn("Blocking runner has been blocking for at least {}.", durationString)
val e: Throwable = LongTimeBlockingException(durationString)
val e: Throwable = ProlongedBlockingException(durationString)
logger.debug(
"Long time blocking duration at least {}",
"Prolonged blocking duration at least {}",
durationString,
e
)
Expand All @@ -916,21 +936,24 @@ private class SuspendRunner<T>(override val context: CoroutineContext = EmptyCor
} catch (ignore: TimeoutException) {
times += 1
} catch (cancellation: CancellationException) {
throw cancellation
throwRunInBlockingException(cancellation)
} catch (execution: ExecutionException) {
throw execution.cause ?: execution
throw execution.cause ?: throwRunInBlockingException(execution)
}
// catch (other: Throwable) {
// throw other // CompletionException(other)
// }
}

// done.
return future.join()
// Is done, but not in the future.get(timeout)
try {
return future.join()
} catch (cancellation: CancellationException) {
throwRunInBlockingException(cancellation)
} catch (execution: CompletionException) {
throwRunInBlockingException(execution)
}
}

// for displaying the stack only
private class LongTimeBlockingException(message: String) : RuntimeException(message)
// Used only to show the stack
private class ProlongedBlockingException(message: String) : RuntimeException(message)

companion object {
private const val SIGNAL_NONE = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2024. ForteScarlet.
*
* Project https://github.com/simple-robot/simpler-robot
* Email [email protected]
*
* This file is part of the Simple Robot Library (Alias: simple-robot, simbot, etc.).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/

package love.forte.simbot.suspendrunner

import kotlinx.coroutines.delay
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.test.Test
import kotlin.test.assertFails
import kotlin.test.assertIs


/**
*
* @author ForteScarlet
*/
class BlockingRunnerTests {
@Test
fun runBlockingExceptionallyTest() {
runInNoScopeBlocking { runNormally() }

assertIs<RunException>(
assertFails {
runInNoScopeBlocking { runExceptionally1() }
}
)

assertIs<RunException>(
assertFails {
runInNoScopeBlocking { runExceptionally2() }
}
)
}

private suspend fun runNormally() = suspendCancellableCoroutine { continuation ->
continuation.resume(0)
}

private suspend fun runExceptionally1() {
delay(1)
throw RunException()
}

private suspend fun runExceptionally2() = suspendCancellableCoroutine<Int> { continuation ->
continuation.resumeWithException(RunException())
}


private class RunException : RuntimeException()
}

0 comments on commit 65a846b

Please sign in to comment.