diff --git a/settings.gradle.kts b/settings.gradle.kts index e3f4a607f..67284a669 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -54,6 +54,7 @@ if (!System.getenv("IS_CI").toBoolean()) { include(":simbot-component-http-server-api") } +//include(":simbot-project-tests:simbot-project-test-j21") @Suppress("NOTHING_TO_INLINE") diff --git a/simboot-core-spring-boot-starter/src/main/kotlin/love/forte/simboot/spring/autoconfigure/application/SpringBootApplication.kt b/simboot-core-spring-boot-starter/src/main/kotlin/love/forte/simboot/spring/autoconfigure/application/SpringBootApplication.kt index 03bd85649..6db16cf7b 100644 --- a/simboot-core-spring-boot-starter/src/main/kotlin/love/forte/simboot/spring/autoconfigure/application/SpringBootApplication.kt +++ b/simboot-core-spring-boot-starter/src/main/kotlin/love/forte/simboot/spring/autoconfigure/application/SpringBootApplication.kt @@ -12,6 +12,7 @@ package love.forte.simboot.spring.autoconfigure.application +import kotlinx.coroutines.Job import love.forte.simbot.ExperimentalSimbotApi import love.forte.simbot.application.* import love.forte.simbot.core.application.* @@ -47,7 +48,7 @@ public object SpringBoot : val configuration = SpringBootApplicationConfiguration().also(configurator) return create(configuration, builder) } - + /** * 直接提供配置类进行构建。 */ @@ -109,7 +110,7 @@ public interface SpringBootApplication : Application */ public interface SpringBootApplicationBuilder : ApplicationBuilder, EventProcessableApplicationBuilder { - + /** * 配置内部的 core listener manager. * @@ -126,8 +127,8 @@ private class SpringBootApplicationBuilderImpl : SpringBootApplicationBuilder, BaseApplicationBuilder() { private var listenerManagerConfigurator: SimpleListenerManagerConfiguration.(environment: Application.Environment) -> Unit = {} - - + + /** * 配置内部的 listener manager. */ @@ -135,57 +136,57 @@ private class SpringBootApplicationBuilderImpl : SpringBootApplicationBuilder, val old = listenerManagerConfigurator listenerManagerConfigurator = { env -> old(env); configurator(env) } } - + private fun buildListenerManager( appConfig: SpringBootApplicationConfiguration, environment: Application.Environment, ): SimpleEventListenerManager { val initial = SimpleListenerManagerConfiguration { - // TODO job? - coroutineContext = appConfig.coroutineContext + // Init context from app context (without Job) + coroutineContext = appConfig.coroutineContext.minusKey(Job) } - + return simpleListenerManager(initial = initial, block = fun SimpleListenerManagerConfiguration.() { listenerManagerConfigurator(environment) }) } - - + + @OptIn(ExperimentalSimbotApi::class) @Suppress("DuplicatedCode") suspend fun build(configuration: SpringBootApplicationConfiguration): SpringBootApplication { val components = buildComponents() - + val logger = configuration.logger - + val environment = SpringBootEnvironment( components, logger, configuration.coroutineContext ) - + logger.debug("Building listener manager...") val listenerManager = buildListenerManager(configuration, environment) logger.debug("Listener manager is built: {}", listenerManager) - - + + logger.debug("Building providers...") val providers = buildProviders(listenerManager, components, configuration) logger.info("The size of providers built is {}", providers.size) if (providers.isNotEmpty()) { logger.debug("The built providers: {}", providers) } - + val application = SpringBootApplicationImpl(configuration, environment, listenerManager, providers) // set application attribute listenerManager.globalScopeContext[ApplicationAttributes.Application] = application - + // complete. complete(application) - + // region register bots // after complete. logger.debug("Registering bots...") val bots = registerBots(providers) - + logger.info("Bots all registered. The size of bots: {}", bots.size) if (bots.isNotEmpty()) { logger.debug("The all registered bots: {}", bots) @@ -193,21 +194,21 @@ private class SpringBootApplicationBuilderImpl : SpringBootApplicationBuilder, val isAutoStartBots = configuration.isAutoStartBots logger.debug("Auto start bots: {}", isAutoStartBots) if (isAutoStartBots && bots.isNotEmpty()) { - bots.forEach { bot -> - logger.info("Starting bot {}", bot) - val started = bot.start() - logger.info("Bot [{}] started: {}", bot, started) - } + bots.forEach { bot -> + logger.info("Starting bot {}", bot) + val started = bot.start() + logger.info("Bot [{}] started: {}", bot, started) + } } - + if (isAutoStartBots && bots.isEmpty()) { logger.debug("But the registered bots are empty.") } // endregion - + return application } - + } @@ -218,7 +219,7 @@ private class SpringBootApplicationImpl( providerList: List, ) : SpringBootApplication, BaseApplication() { override val providers: List = providerList.view() - + override val coroutineContext = environment.coroutineContext override val logger: Logger = environment.logger } diff --git a/simbot-api/src/main/kotlin/love/forte/simbot/application/ApplicationFactory.kt b/simbot-api/src/main/kotlin/love/forte/simbot/application/ApplicationFactory.kt index 63e445ed7..bceec013d 100644 --- a/simbot-api/src/main/kotlin/love/forte/simbot/application/ApplicationFactory.kt +++ b/simbot-api/src/main/kotlin/love/forte/simbot/application/ApplicationFactory.kt @@ -12,13 +12,16 @@ package love.forte.simbot.application +import kotlinx.coroutines.asCoroutineDispatcher import love.forte.plugin.suspendtrans.annotation.JvmBlocking +import love.forte.simbot.Api4J import love.forte.simbot.Component import love.forte.simbot.ComponentFactory import love.forte.simbot.ability.CompletionPerceivable import love.forte.simbot.bot.Bot import love.forte.simbot.bot.BotVerifyInfo import org.slf4j.Logger +import java.util.concurrent.Executor import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -33,7 +36,7 @@ public interface ApplicationFactory< Builder : ApplicationBuilder, A : Application, > { - + /** * 提供配置函数和构建器函数,构建一个 [Application] 实例。 */ @@ -51,7 +54,7 @@ public interface ApplicationFactory< * @param A 目标 [Application] 类型 */ public interface ApplicationBuilder : CompletionPerceivable { - + /** * 注册一个 [组件][Component]. */ @@ -60,7 +63,7 @@ public interface ApplicationBuilder : CompletionPerceivable componentFactory: ComponentFactory, configurator: Config.(perceivable: CompletionPerceivable) -> Unit = {}, ) - + /** * 注册一个事件提供者。 */ @@ -69,15 +72,15 @@ public interface ApplicationBuilder : CompletionPerceivable eventProviderFactory: EventProviderFactory, configurator: Config.(perceivable: CompletionPerceivable) -> Unit = {}, ) - - + + /** * 提供一个可以使用 [BotVerifyInfo] 进行通用性bot注册的配置方式。 */ @ApplicationBuilderDsl public fun bots(registrar: suspend BotRegistrar.() -> Unit) - - + + /** * 注册一个当 [Application] 构建完成后的回调函数。 * @@ -91,7 +94,7 @@ public interface ApplicationBuilder : CompletionPerceivable */ @ApplicationBuilderDsl override fun onCompletion(handle: suspend (application: A) -> Unit) - + } @@ -105,7 +108,7 @@ public interface ApplicationBuilder : CompletionPerceivable * */ public interface BotRegistrar { - + /** * 当前环境中的所有事件提供者。 * @@ -124,8 +127,8 @@ public interface BotRegistrar { * */ public val providers: List - - + + /** * 通过 [BotVerifyInfo] 中的 [组件信息][BotVerifyInfo.componentId] * 去当前环境中寻找对应组件的、实现了 [Bot注册器][love.forte.simbot.bot.BotRegistrar] 的 [事件提供者][EventProvider], @@ -150,7 +153,7 @@ public annotation class ApplicationBuilderDsl * 整个应用程序进行构建所需的基本配置信息。 */ public open class ApplicationConfiguration { - + /** * 当前application内所使用的协程上下文。 * @@ -159,10 +162,21 @@ public open class ApplicationConfiguration { * */ public open var coroutineContext: CoroutineContext = EmptyCoroutineContext - + /** * 提供一个用于Application内部的日志对象。 */ - public open var logger: Logger = love.forte.simbot.logger.LoggerFactory.getLogger("love.forte.simbot.application.ApplicationConfiguration") - + public open var logger: Logger = + love.forte.simbot.logger.LoggerFactory.getLogger("love.forte.simbot.application.ApplicationConfiguration") + + /** + * 向 [coroutineContext] 中应用一个 [Executor]。 + * 面向Java开发者的友好API,在Java中可以更方便的指定一个调度器。 + * + * @since 3.3.0 + */ + @Api4J + public open fun appendExecutorToCoroutineContext(executor: Executor) { + coroutineContext += executor.asCoroutineDispatcher() + } } diff --git a/simbot-core/src/main/kotlin/love/forte/simbot/core/application/BaseStandardApplicationBuilder.kt b/simbot-core/src/main/kotlin/love/forte/simbot/core/application/BaseStandardApplicationBuilder.kt index 42730a09b..fb48a6b2b 100644 --- a/simbot-core/src/main/kotlin/love/forte/simbot/core/application/BaseStandardApplicationBuilder.kt +++ b/simbot-core/src/main/kotlin/love/forte/simbot/core/application/BaseStandardApplicationBuilder.kt @@ -12,6 +12,7 @@ package love.forte.simbot.core.application +import kotlinx.coroutines.Job import love.forte.simbot.application.Application import love.forte.simbot.application.ApplicationBuilder import love.forte.simbot.application.ApplicationBuilderDsl @@ -71,7 +72,7 @@ public abstract class BaseStandardApplicationBuilder : BaseAppl environment: Application.Environment, ): SimpleEventListenerManager { val initial = SimpleListenerManagerConfiguration { - coroutineContext = appConfig.coroutineContext + coroutineContext = appConfig.coroutineContext.minusKey(Job) } return simpleListenerManager(initial = initial) { diff --git a/simbot-core/src/main/kotlin/love/forte/simbot/core/event/SimpleListenerManagerConfiguration.kt b/simbot-core/src/main/kotlin/love/forte/simbot/core/event/SimpleListenerManagerConfiguration.kt index 8a92d8ec5..bde25d3dd 100644 --- a/simbot-core/src/main/kotlin/love/forte/simbot/core/event/SimpleListenerManagerConfiguration.kt +++ b/simbot-core/src/main/kotlin/love/forte/simbot/core/event/SimpleListenerManagerConfiguration.kt @@ -12,11 +12,16 @@ package love.forte.simbot.core.event +import kotlinx.coroutines.Job +import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.serialization.modules.EmptySerializersModule import kotlinx.serialization.modules.SerializersModule import love.forte.simbot.* import love.forte.simbot.event.* import love.forte.simbot.event.EventListenerRegistrationDescription.Companion.toRegistrationDescription +import love.forte.simbot.logger.LoggerFactory +import love.forte.simbot.logger.logger +import java.util.concurrent.Executor import java.util.function.Function import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -43,7 +48,24 @@ public open class SimpleListenerManagerConfiguration { */ @SimpleEventManagerConfigDSL public var coroutineContext: CoroutineContext = EmptyCoroutineContext - + set(value) { + if (value[Job] != null) { + logger.warn("Job in SimpleListenerManagerConfiguration.coroutineContext will be dropped.") + } + field = value.minusKey(Job) + } + + /** + * 向 [coroutineContext] 中应用一个 [Executor]。 + * 面向Java开发者的友好API,在Java中可以更方便的指定一个调度器。 + * + * @since 3.3.0 + */ + @Api4J + public open fun appendExecutorToCoroutineContext(executor: Executor) { + coroutineContext += executor.asCoroutineDispatcher() + } + /** * 事件流程拦截器的列表。 */ @@ -266,6 +288,8 @@ public open class SimpleListenerManagerConfiguration { public inline operator fun invoke(block: SimpleListenerManagerConfiguration.() -> Unit): SimpleListenerManagerConfiguration { return SimpleListenerManagerConfiguration().also(block) } + + private val logger = LoggerFactory.logger() } } diff --git a/simbot-project-tests/simbot-project-test-j21/build.gradle.kts b/simbot-project-tests/simbot-project-test-j21/build.gradle.kts new file mode 100644 index 000000000..261f3ee42 --- /dev/null +++ b/simbot-project-tests/simbot-project-test-j21/build.gradle.kts @@ -0,0 +1,51 @@ + +/* + * Copyright (c) 2023 ForteScarlet. + * + * This file is part of Simple Robot. + * + * Simple Robot is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. + * + * Simple Robot is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License along with Simple Robot. If not, see . + */ + +plugins { + kotlin("jvm") // version "1.9.20-Beta2" + +} + + +tasks.getByName("test") { + useJUnitPlatform() +} + +dependencies { + implementation(project(":simbot-util-suspend-transformer")) + testImplementation(kotlin("test-junit5")) +} + +tasks.withType().configureEach { + kotlinOptions { + javaParameters = true + jvmTarget = "21" + freeCompilerArgs = freeCompilerArgs + listOf("-Xjvm-default=all") + } +} + +kotlin { + explicitApi() + this.sourceSets.configureEach { + languageSettings { + optIn("kotlin.RequiresOptIn") + } + } +} + + +tasks.withType { + sourceCompatibility = "21" + targetCompatibility = "21" + options.encoding = "UTF-8" +} diff --git a/simbot-project-tests/simbot-project-test-j21/src/test/kotlin/VisDisp.kt b/simbot-project-tests/simbot-project-test-j21/src/test/kotlin/VisDisp.kt new file mode 100644 index 000000000..7c18854e9 --- /dev/null +++ b/simbot-project-tests/simbot-project-test-j21/src/test/kotlin/VisDisp.kt @@ -0,0 +1,15 @@ +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.asCoroutineDispatcher +import love.forte.simbot.utils.CustomBlockingDispatcherProvider +import java.util.concurrent.Executors + +/** + * + * @author ForteScarlet + */ +class VisDisp : CustomBlockingDispatcherProvider() { + override fun blockingDispatcher(): CoroutineDispatcher { + return Executors.newVirtualThreadPerTaskExecutor().asCoroutineDispatcher() + } + +} diff --git a/simbot-project-tests/simbot-project-test-j21/src/test/kotlin/t/BlockingRunnerTests.kt b/simbot-project-tests/simbot-project-test-j21/src/test/kotlin/t/BlockingRunnerTests.kt new file mode 100644 index 000000000..dd23bb41c --- /dev/null +++ b/simbot-project-tests/simbot-project-test-j21/src/test/kotlin/t/BlockingRunnerTests.kt @@ -0,0 +1,55 @@ +package t + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import love.forte.simbot.utils.runInBlocking +import love.forte.simbot.utils.runInNoScopeBlocking +import kotlin.coroutines.resume + + +fun main() { + System.setProperty("simbot.blockingRunner.waitTimeoutMilliseconds", "2000") +// System.setProperty("simbot.runInBlocking.dispatcher", "custom") + System.setProperty("simbot.runInBlocking.dispatcher", "forkJoinPool") +// runInNoScopeBlocking { +// delay(5000) +// println("你好") +// } +// +// +// val value = runInNoScopeBlocking { +// delay(5000) +// kotlin.random.Random.nextLong() +// } +// +// println("value = $value") + + val value2 = runInBlocking { + println("T[1]" + Thread.currentThread()) + coroutineScope { + println("T[2]" + Thread.currentThread()) + suspendCancellableCoroutine { c -> + println("T[3]" + Thread.currentThread()) + launch { + println("T[4]" + Thread.currentThread()) + delay(20) + c.resume(kotlin.random.Random.nextLong()) + println("T[5]" + Thread.currentThread()) + } + } + println("T[6]" + Thread.currentThread()) + } + println("T[7]" + Thread.currentThread()) + } + + println("value2 = $value2") + +// runInNoScopeBlocking { +// delay(5000) +// throw RuntimeException() +// } + + println("Done.") +} diff --git a/simbot-project-tests/simbot-project-test-j21/src/test/resources/META-INF/services/love.forte.simbot.utils.CustomBlockingDispatcherProvider b/simbot-project-tests/simbot-project-test-j21/src/test/resources/META-INF/services/love.forte.simbot.utils.CustomBlockingDispatcherProvider new file mode 100644 index 000000000..c04fb1139 --- /dev/null +++ b/simbot-project-tests/simbot-project-test-j21/src/test/resources/META-INF/services/love.forte.simbot.utils.CustomBlockingDispatcherProvider @@ -0,0 +1 @@ +VisDisp diff --git a/simbot-util-api-requestor-ktor/build.gradle.kts b/simbot-util-api-requestor-ktor/build.gradle.kts index fc6f855bb..cd7f306ee 100644 --- a/simbot-util-api-requestor-ktor/build.gradle.kts +++ b/simbot-util-api-requestor-ktor/build.gradle.kts @@ -50,9 +50,9 @@ kotlin { // https://ktor.io/docs/client-supported-platforms.html val supportedPlatforms = setOf( // iOS - "iosArm32", "iosArm64", "iosX64", "iosSimulatorArm64", + "iosArm64", "iosX64", "iosSimulatorArm64", // watchOS - "watchosArm32", "watchosArm64", "watchosX86", "watchosX64", "watchosSimulatorArm64", + "watchosArm32", "watchosArm64", "watchosX64", "watchosSimulatorArm64", // tvOS "tvosArm64", "tvosX64", "tvosSimulatorArm64", // macOS diff --git a/simbot-util-suspend-transformer/build.gradle.kts b/simbot-util-suspend-transformer/build.gradle.kts index e18715770..b8e610c9b 100644 --- a/simbot-util-suspend-transformer/build.gradle.kts +++ b/simbot-util-suspend-transformer/build.gradle.kts @@ -76,6 +76,13 @@ kotlin { api(libs.kotlinx.coroutines.jdk8) } } + + getByName("jvmTest") { + dependencies { + api(project(":simbot-logger-slf4j-impl")) + } + } + getByName("jsMain") { dependencies { api(libs.kotlinx.coroutines.core) diff --git a/simbot-util-suspend-transformer/src/jvmMain/kotlin/love/forte/simbot/utils/BlockingRunner.kt b/simbot-util-suspend-transformer/src/jvmMain/kotlin/love/forte/simbot/utils/BlockingRunner.kt index 0c5e2f571..b0a654615 100644 --- a/simbot-util-suspend-transformer/src/jvmMain/kotlin/love/forte/simbot/utils/BlockingRunner.kt +++ b/simbot-util-suspend-transformer/src/jvmMain/kotlin/love/forte/simbot/utils/BlockingRunner.kt @@ -21,12 +21,15 @@ import love.forte.simbot.ExperimentalSimbotApi import love.forte.simbot.FragileSimbotApi import love.forte.simbot.InternalSimbotApi import love.forte.simbot.logger.LoggerFactory +import java.lang.invoke.MethodHandles +import java.util.* import java.util.concurrent.* +import java.util.concurrent.CancellationException +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater import java.util.concurrent.atomic.AtomicLong -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.startCoroutine +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater +import kotlin.concurrent.Volatile +import kotlin.coroutines.* import kotlin.time.Duration.Companion.milliseconds @@ -47,7 +50,7 @@ private fun createDefaultDispatcher( if (coreSize == null && maxSize == null && keepAliveTime == null) { return null } - // cpu / 2 or 1 + // cpu / 2 or 8 val availableProcessors = Runtime.getRuntime().availableProcessors() val coreSize = (coreSize ?: (availableProcessors / 2)).coerceAtLeast(8) val maxSize = @@ -90,6 +93,7 @@ private const val DISPATCHER_USE_DEFAULT_PROPERTY_VALUE = "default" private const val DISPATCHER_USE_MAIN_PROPERTY_VALUE = "main" private const val DISPATCHER_USE_UNCONFINED_PROPERTY_VALUE = "unconfined" private const val DISPATCHER_USE_FORK_JOIN_POOL_PROPERTY_VALUE = "forkJoinPool" +private const val DISPATCHER_USE_CUSTOM_PROPERTY_VALUE = "custom" private const val DISPATCHER_LIMITED_PARALLELISM = "limitedParallelism" @@ -110,6 +114,80 @@ private const val ASYNC_DISPATCHER_KEEP_ALIVE_TIME_PROPERTY = "$ASYNC_DISPATCHER // endregion +/** + * 提供一个用于阻塞调用的调度器的供应商。 + * + * 当JVM参数 `simbot.runInBlocking.dispatcher` 的值为 `custom` 时: + * ``` + * -Dsimbot.runInBlocking.dispatcher=custom + * ``` + * 会通过 SPI 加载 [CustomBlockingDispatcherProvider] 来作为阻塞调用时的默认调度器。 + * + * 如果 SPI 检测到环境种存在多个 `CustomBlockingExecutorProvider`,会输出警告并就近选择其中一个。 + * + * 对于不是非常容易得到 [CoroutineDispatcher] 类型的情况(例如使用Java)或希望单纯提供一个 [Executor] 时, + * 可以选择使用 [CustomBlockingExecutorProvider] 类型。 + * + * @see CustomBlockingDispatcherProvider + * @since 3.3.0 + */ +public abstract class CustomBlockingDispatcherProvider { + /** + * 得到用于阻塞调用的调度器。 + */ + public abstract fun blockingDispatcher(): CoroutineDispatcher +} + +/** + * 提供一个用于阻塞调用的调度器的供应商。 + * + * @see CustomBlockingDispatcherProvider + * @since 3.3.0 + */ +public abstract class CustomBlockingExecutorProvider : CustomBlockingDispatcherProvider() { + final override fun blockingDispatcher(): CoroutineDispatcher = blockingExecutor().asCoroutineDispatcher() + + /** + * 得到用于阻塞调用的 [Executor]。 + * 会通过 [asCoroutineDispatcher] 转化为 [CoroutineDispatcher]。 + */ + public abstract fun blockingExecutor(): Executor +} + + +private class CustomBlockingDispatcherProviderNotFoundException( + classLoader: ClassLoader? +) : RuntimeException("System property 'simbot.runInBlocking.dispatcher' is 'custom', but there is no provider loaded via classLoader $classLoader") + + +private fun loadCustomBlockingDispatcher(loader: ClassLoader?): CoroutineDispatcher { + val serviceLoader = ServiceLoader.load(CustomBlockingDispatcherProvider::class.java, loader) + val services = serviceLoader.toList() + + if (services.isEmpty()) { + throw CustomBlockingDispatcherProviderNotFoundException(loader) + } + + val first = services.first() + val dis = first.blockingDispatcher() + + if (services.size > 1) { + // log + logger.warn( + "System property 'simbot.runInBlocking.dispatcher' is 'custom', and the size of providers are more than 1: {}", + services.size + ) + for ((index, provider) in services.withIndex()) { + logger.warn("index: {}, provider: {}", index, provider) + } + + logger.warn("Will choose the first (index=0) dispatcher {} of provider {}", dis, first) + } + + return dis +} + + /** * 使用在阻塞API(例如 [runInBlocking] )或非Java协程环境中的默认调度器。 * 会在首次被获取的时候进行实例化。 @@ -134,6 +212,7 @@ private const val ASYNC_DISPATCHER_KEEP_ALIVE_TIME_PROPERTY = "$ASYNC_DISPATCHER * | `simbot.runInBlocking.dispatcher=main` | [Dispatchers.Main] | 使用 [Dispatchers.Main] 作为默认调度器. | * | `simbot.runInBlocking.dispatcher=unconfined` | [Dispatchers.Unconfined] | 使用 [Dispatchers.Unconfined] 作为默认调度器. | * | `simbot.runInBlocking.dispatcher=forkJoinPool` | [ForkJoinPool] | 使用 [ForkJoinPool] 作为默认调度器. | + * | `simbot.runInBlocking.dispatcher=custom` | (since 3.3.0) 通过 SPI 加载 [CustomBlockingDispatcherProvider] 并通过其构建 [CoroutineDispatcher] | * * 如果选择了使用某个具体的调度器,那么你可以额外指定属性 `simbot.runInBlocking.dispatcher.limitedParallelism` 来通过 [CoroutineDispatcher.limitedParallelism] * 来限制使用的最大并发数。更多说明(和警告)参考 [CoroutineDispatcher.limitedParallelism]。 @@ -199,6 +278,7 @@ private inline fun initDefaultBlockingDispatcher( dispatcher eq DISPATCHER_USE_FORK_JOIN_POOL_PROPERTY_VALUE -> ForkJoinPool.commonPool() .asCoroutineDispatcher() + dispatcher eq DISPATCHER_USE_CUSTOM_PROPERTY_VALUE -> loadCustomBlockingDispatcher(Thread.currentThread().contextClassLoader) else -> null } @@ -278,7 +358,7 @@ public val DefaultBlockingContext: CoroutineContext by lazy { * | `simbot.runInAsync.dispatcher=main` | [Dispatchers.Main] | 使用 [Dispatchers.Main] 作为默认调度器. | * | `simbot.runInAsync.dispatcher=unconfined` | [Dispatchers.Unconfined] | 使用 [Dispatchers.Unconfined] 作为默认调度器. | * | `simbot.runInAsync.dispatcher=forkJoinPool` | [ForkJoinPool] | 使用 [ForkJoinPool] 作为默认调度器. | - * + * | `simbot.runInAsync.dispatcher=custom` | (since 3.3.0) 通过 SPI 加载 [CustomBlockingDispatcherProvider] 并通过其构建 [CoroutineDispatcher] | */ @InternalSimbotApi public val DefaultAsyncDispatcherOrNull: CoroutineDispatcher? by lazy { @@ -422,13 +502,13 @@ private object DefaultRunInNoScopeBlockingStrategy : RunInNoScopeBlockingStrateg override fun invoke(context: CoroutineContext, block: suspend () -> T): T { val runner = SuspendRunner(context) block.startCoroutine(runner) - return runner.await(SuspendRunner.isWaitTimeoutEnabled).getOrThrow() + return runner.await(SuspendRunner.isWaitTimeoutEnabled) } fun invokeWithoutTimeoutLog(context: CoroutineContext, block: suspend () -> T): T { val runner = SuspendRunner(context) block.startCoroutine(runner) - return runner.await(isWaitTimeoutEnabled = false).getOrThrow() + return runner.await(isWaitTimeoutEnabled = false) } } @@ -595,67 +675,166 @@ public fun `$$runInAsyncNullable`(block: suspend () -> T, scope: CoroutineSc // Yes. I am the BlockingRunner. -@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") private class SuspendRunner(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation { - private var result: Result? = null + @Suppress("unused") + @Volatile + var s: Int = 0 + // 1 = resume - success + // 2 = resume - exception + // 3 = suspend + + // 1 -> value + // 2 -> ex + // 3 -> CompletableFuture + @Volatile + var value: Any? = null + + private object NULL override fun resumeWith(result: Result) { - synchronized(this) { - this.result = result - (this as Object).notifyAll() + val resumed = + signalUpdater.compareAndSet( + this, + SIGNAL_NONE, + if (result.isSuccess) SIGNAL_RESUME_SUCCESS else SIGNAL_RESUME_FAILED + ) + if (!resumed) { + // value is future. + @Suppress("UNCHECKED_CAST") + valueUpdater.updateAndGet(this) { curr -> + ((curr as? CompletableFuture) ?: CompletableFuture()).also { f -> + result.onSuccess { value -> + f.complete(value) + }.onFailure { e -> + f.completeExceptionally(e) + } + } + } + } else { + // resume success, set value + result.onSuccess { value -> + valueUpdater.set(this, value ?: NULL) + }.onFailure { e -> + valueUpdater.set(this, e) + } } } - // @Suppress("BlockingMethodInNonBlockingContext") - fun await(isWaitTimeoutEnabled: Boolean): Result { - synchronized(this) { - @Suppress("UNUSED_VARIABLE") // emm? - var times = 0 - while (true) { - when (val result = this.result) { - null -> { - if (isWaitTimeoutEnabled) { - if (times > 0) { - val duration = (waitTimeout * times).milliseconds - if (logger.isDebugEnabled) { - val durationString = duration.toString() - logger.warn("Blocking runner has been blocking for at least {}.", durationString) - val e: Throwable = LongTimeBlockingException(durationString) - logger.debug( - "Long time blocking duration at least {}", - durationString, - e - ) - } else { - logger.warn( - "Blocking runner has been blocking for at least {}. Enable debug logging for '{}' for more stack information.", - duration.toString(), - LOGGER_NAME - ) - } - } - (this as Object).wait(waitTimeout) - times += 1 - - } else { - // just wait. - (this as Object).wait() - } + /** + * @see CompletableFuture.join + * @see CompletableFuture.get + * @throws CancellationException cancellation + * @throws CompletionException completion + * @throws ExecutionException if [ExecutionException.cause] is null + * @throws Exception [ExecutionException.cause] + */ + @Suppress("UNCHECKED_CAST") + fun await(isWaitTimeoutEnabled: Boolean): T { + val future: CompletableFuture + + // 预期为异步挂起。如果成功,value设置为 CompletableFuture. + if (signalUpdater.compareAndSet(this, SIGNAL_NONE, SIGNAL_SUSPEND)) { + future = valueUpdater.updateAndGet(this) { curr -> + (curr as? CompletableFuture) ?: CompletableFuture() + } as CompletableFuture + } else { + // 失败,则获取,或等待结果 + var value: Any? + do { + value = valueUpdater.get(this) + } while (value == null) + + if (value == NULL) { + return null as T + } - } + // success or failed + if (signalUpdater.get(this) == SIGNAL_RESUME_SUCCESS) { + return value as T + } - else -> { - return result - } + throw (value as Throwable) + } + + if (!isWaitTimeoutEnabled || (!logIfVirtual && Thread.currentThread().isVirtualThread())) { + try { + return future.get() + } catch (cancellation: CancellationException) { + throw cancellation + } catch (execution: ExecutionException) { + throw execution.cause ?: execution + } catch (other: Throwable) { + throw CompletionException(other) + } + } + + var times = 0 + while (!future.isDone && !Thread.currentThread().isInterrupted) { + if (times > 0) { + val duration = (waitTimeout * times).milliseconds + if (logger.isDebugEnabled) { + val durationString = duration.toString() + logger.warn("Blocking runner has been blocking for at least {}.", durationString) + val e: Throwable = LongTimeBlockingException(durationString) + logger.debug( + "Long time blocking duration at least {}", + durationString, + e + ) + } else { + logger.warn( + "Blocking runner has been blocking for at least {}. Enable debug logging for '{}' for more stack information.", + duration.toString(), + LOGGER_NAME + ) } } + + try { + return future.get(waitTimeout, TimeUnit.MILLISECONDS) + } catch (timeout: TimeoutException) { + times += 1 + } catch (cancellation: CancellationException) { + throw cancellation + } catch (execution: ExecutionException) { + throw execution.cause ?: execution + } catch (other: Throwable) { + throw CompletionException(other) + } } + + // done. + return future.join() } // for displaying the stack only private class LongTimeBlockingException(message: String) : RuntimeException(message) companion object { + private const val SIGNAL_NONE = 0 + private const val SIGNAL_RESUME_SUCCESS = 1 + private const val SIGNAL_RESUME_FAILED = 2 + private const val SIGNAL_SUSPEND = 3 + private val signalUpdater = AtomicIntegerFieldUpdater.newUpdater(SuspendRunner::class.java, "s") + private val valueUpdater = + AtomicReferenceFieldUpdater.newUpdater(SuspendRunner::class.java, Any::class.java, "value") +// private val futureUpdater = AtomicReferenceFieldUpdater.newUpdater(SuspendRunner::class.java, CompletableFuture::class.java, "future") + + // ignore for Virtual thread? + // val threadIsV = MethodHandles.publicLookup().findVirtual(Thread::class.java, "isVirtual", MethodType.methodType(java.lang.Boolean.TYPE)) + + private val isVirtualThreadFunc = runCatching<(Thread) -> Boolean> { + val mh = MethodHandles.publicLookup().findVirtual(Thread::class.java, "isVirtual", java.lang.invoke.MethodType.methodType(java.lang.Boolean.TYPE)) + return@runCatching { t -> mh.invoke(t) as Boolean } + }.getOrElse { + return@getOrElse { false } + } + + private fun Thread.isVirtualThread(): Boolean = isVirtualThreadFunc(this) + + private const val BLOCKING_RUNNER_WAIT_TIME_LOG_IF_VIRTUAL_PROPERTY_NAME = + "simbot.blockingRunner.waitTimeoutLogIfVirtual" + private const val BLOCKING_RUNNER_DEFAULT_WAIT_TIME_PROPERTY_NAME = "simbot.blockingRunner.waitTimeoutMilliseconds" private const val BLOCKING_RUNNER_DISABLE_WAIT_TIME_PROPERTY_NAME = "simbot.blockingRunner.disableWaitTimeout" @@ -664,6 +843,8 @@ private class SuspendRunner(override val context: CoroutineContext = EmptyCor systemLong(BLOCKING_RUNNER_DEFAULT_WAIT_TIME_PROPERTY_NAME)?.takeIf { it > 0 } ?: DEFAULT_WAIT_TIME internal val isWaitTimeoutEnabled = !systemBool(BLOCKING_RUNNER_DISABLE_WAIT_TIME_PROPERTY_NAME) + private val logIfVirtual = systemBool(BLOCKING_RUNNER_WAIT_TIME_LOG_IF_VIRTUAL_PROPERTY_NAME) + init { if (isWaitTimeoutEnabled) { logger.info( diff --git a/simbot-util-suspend-transformer/src/jvmTest/kotlin/BlockingRunnerTests.kt b/simbot-util-suspend-transformer/src/jvmTest/kotlin/BlockingRunnerTests.kt new file mode 100644 index 000000000..890387b52 --- /dev/null +++ b/simbot-util-suspend-transformer/src/jvmTest/kotlin/BlockingRunnerTests.kt @@ -0,0 +1,53 @@ +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import love.forte.simbot.utils.runInNoScopeBlocking +import kotlin.coroutines.resume + + +fun main() { + System.setProperty("simbot.blockingRunner.waitTimeoutMilliseconds", "2000") + System.setProperty("simbot.runInBlocking.dispatcher", "forkjoinpool") +// MethodHandles.publicLookup().findVirtual(Thread::class.java, "isVirtual", MethodType.methodType(java.lang.Boolean.TYPE)) + +// runInNoScopeBlocking { +// delay(5000) +// println("你好") +// } +// +// +// val value = runInNoScopeBlocking { +// delay(5000) +// kotlin.random.Random.nextLong() +// } +// +// println("value = $value") + + val value2 = runInNoScopeBlocking { + println("T[1]" + Thread.currentThread()) + coroutineScope { + println("T[2]" + Thread.currentThread()) + suspendCancellableCoroutine { c -> + println("T[3]" + Thread.currentThread()) + launch { + println("T[4]" + Thread.currentThread()) + delay(4000) + c.resume(kotlin.random.Random.nextLong()) + println("T[5]" + Thread.currentThread()) + } + } + println("T[6]" + Thread.currentThread()) + } + println("T[7]" + Thread.currentThread()) + } + + println("value2 = $value2") + +// runInNoScopeBlocking { +// delay(5000) +// throw RuntimeException() +// } + + println("Done.") +} diff --git a/website b/website index 1b5fe55a7..9f90f0c0f 160000 --- a/website +++ b/website @@ -1 +1 @@ -Subproject commit 1b5fe55a72453952be66bc742445bff6c80344c3 +Subproject commit 9f90f0c0ff1d876a5b934b422d9e2836fc99b64d