diff --git a/kotlin-multiplatform/src/commonMain/kotlin/com/ricoh360/thetaclient/ThetaApi.kt b/kotlin-multiplatform/src/commonMain/kotlin/com/ricoh360/thetaclient/ThetaApi.kt index 1e0d8a6d46..e26aec8bb6 100644 --- a/kotlin-multiplatform/src/commonMain/kotlin/com/ricoh360/thetaclient/ThetaApi.kt +++ b/kotlin-multiplatform/src/commonMain/kotlin/com/ricoh360/thetaclient/ThetaApi.kt @@ -16,11 +16,13 @@ import io.ktor.client.statement.* import io.ktor.serialization.kotlinx.json.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.sync.Semaphore import kotlinx.io.files.* import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.Json @@ -30,7 +32,7 @@ internal const val ALLOWED_CAPTURE_INTERVAL = 1000 /** * Http client using [Ktor](https://jp.ktor.work/clients/index.html) */ -@OptIn(ExperimentalSerializationApi::class) // explicitNulls +@OptIn(ExperimentalSerializationApi::class, ExperimentalCoroutinesApi::class) // explicitNulls internal object ThetaApi { val httpClient: HttpClient // for commands other than getLivePreview command get() = getHttpClient() @@ -41,7 +43,7 @@ internal object ThetaApi { val multipartPostClient: MultipartPostClient // just for updateFirmware protcol get() = getMultipartPostClient() - val requestSemaphore = Semaphore(1) + val requestScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1)) var lastSetTimeConsumingOptionTime: Long = 0 var currentOptions = Options() @@ -65,7 +67,7 @@ internal object ThetaApi { suspend fun callInfoApi( endpoint: String, ): InfoApiResponse { - return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) { + return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) { httpClient.get(getApiUrl(endpoint, InfoApi.PATH)).body() } } @@ -84,7 +86,7 @@ internal object ThetaApi { suspend fun callLicenseApi( endpoint: String, ): HttpResponse { - return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) { + return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) { httpClient.get(getApiUrl(endpoint, LicenseApi.PATH)) } } @@ -103,7 +105,7 @@ internal object ThetaApi { suspend fun callStateApi( endpoint: String, ): StateApiResponse { - return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) { + return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) { httpClient.post(getApiUrl(endpoint, StateApi.PATH)).body() } } @@ -126,7 +128,7 @@ internal object ThetaApi { endpoint: String, params: StatusApiParams, ): CommandApiResponse { - return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) { + return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) { val request = StatusApiRequest(name = params.name, id = params.id) val response = httpClient.post(getApiUrl(endpoint, StatusApi.PATH)) { headers { @@ -898,7 +900,7 @@ internal object ThetaApi { endpoint: String, body: T, ): HttpResponse { - return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) { + return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) { httpClient.post(getApiUrl(endpoint, CommandApi.PATH)) { headers { append("Content-Type", "application/json; charset=utf-8") diff --git a/kotlin-multiplatform/src/commonMain/kotlin/com/ricoh360/thetaclient/Util.kt b/kotlin-multiplatform/src/commonMain/kotlin/com/ricoh360/thetaclient/Util.kt index 87a1f6277d..ab919fc788 100644 --- a/kotlin-multiplatform/src/commonMain/kotlin/com/ricoh360/thetaclient/Util.kt +++ b/kotlin-multiplatform/src/commonMain/kotlin/com/ricoh360/thetaclient/Util.kt @@ -1,7 +1,10 @@ package com.ricoh360.thetaclient import korlibs.crypto.md5 -import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeout internal const val HEX_CHARACTERS = "0123456789abcdef" @@ -24,22 +27,22 @@ internal fun md5(data: String): String { } internal suspend fun syncExecutor( - semaphore: Semaphore, + scope: CoroutineScope, timeout: Long, run: suspend () -> T, ): T { - try { - withTimeout(timeout) { - semaphore.acquire() + val deferred = CompletableDeferred() + scope.launch { + runBlocking { + try { + withTimeout(timeout) { + val result = run() + deferred.complete(result) + } + } catch (e: Throwable) { + deferred.completeExceptionally(e) + } } - } catch (e: Throwable) { - println("timeout acquire") - throw e - } - try { - val result = run() - return result - } finally { - semaphore.release() } + return deferred.await() } diff --git a/kotlin-multiplatform/src/commonTest/kotlin/com/ricoh360/thetaclient/repository/ThetaRepositoryTest.kt b/kotlin-multiplatform/src/commonTest/kotlin/com/ricoh360/thetaclient/repository/ThetaRepositoryTest.kt index 7808f201db..29c521f9d9 100644 --- a/kotlin-multiplatform/src/commonTest/kotlin/com/ricoh360/thetaclient/repository/ThetaRepositoryTest.kt +++ b/kotlin-multiplatform/src/commonTest/kotlin/com/ricoh360/thetaclient/repository/ThetaRepositoryTest.kt @@ -7,7 +7,6 @@ import com.ricoh360.thetaclient.MockApiClient import com.ricoh360.thetaclient.ThetaRepository import io.ktor.http.* import io.ktor.utils.io.* -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch @@ -15,7 +14,6 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlin.test.* -@OptIn(ExperimentalCoroutinesApi::class) class ThetaRepositoryTest { private val endpoint = "http://192.168.1.1:80/" @@ -793,15 +791,20 @@ class ThetaRepositoryTest { @Test fun callSingleRequestTimeoutTest() = runBlocking { - var counter = 0 - val jsonString = Resource("src/commonTest/resources/info/info_z1.json").readText() - MockApiClient.onRequest = { _ -> - counter += 1 - runBlocking { - delay(500) + val jsonInfo = Resource("src/commonTest/resources/info/info_z1.json").readText() + val jsonState = Resource("src/commonTest/resources/state/state_z1.json").readText() + MockApiClient.onRequest = { request -> + when (request.url.encodedPath) { + "/osc/info" -> ByteReadChannel(jsonInfo) + "/osc/state" -> { + runBlocking { + delay(500) + } + ByteReadChannel(jsonState) + } + + else -> throw Exception("Error") } - counter -= 1 - ByteReadChannel(jsonString) } val timeout = ThetaRepository.Timeout( @@ -812,10 +815,15 @@ class ThetaRepositoryTest { val apiJobsList = listOf( launch { thetaRepository.getThetaInfo() + try { + thetaRepository.getThetaInfo() + } catch (e: ThetaRepository.NotConnectedException) { + assertTrue(false) + } }, launch { try { - thetaRepository.getThetaInfo() + thetaRepository.getThetaState() assertTrue(false) } catch (e: ThetaRepository.NotConnectedException) { assertTrue((e.message?.indexOf("time", 0, true) ?: -1) >= 0, "timeout error")