Skip to content

Commit

Permalink
Change exclusivity control of API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
osakila authored Sep 4, 2024
1 parent 8df2f13 commit c04c1de
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import io.ktor.client.statement.*
import io.ktor.serialization.kotlinx.json.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Semaphore
import kotlinx.io.files.*
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
Expand All @@ -30,7 +32,7 @@ internal const val ALLOWED_CAPTURE_INTERVAL = 1000
/**
* Http client using [Ktor](https://jp.ktor.work/clients/index.html)
*/
@OptIn(ExperimentalSerializationApi::class) // explicitNulls
@OptIn(ExperimentalSerializationApi::class, ExperimentalCoroutinesApi::class) // explicitNulls
internal object ThetaApi {
val httpClient: HttpClient // for commands other than getLivePreview command
get() = getHttpClient()
Expand All @@ -41,7 +43,7 @@ internal object ThetaApi {
val multipartPostClient: MultipartPostClient // just for updateFirmware protcol
get() = getMultipartPostClient()

val requestSemaphore = Semaphore(1)
val requestScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
var lastSetTimeConsumingOptionTime: Long = 0
var currentOptions = Options()

Expand All @@ -65,7 +67,7 @@ internal object ThetaApi {
suspend fun callInfoApi(
endpoint: String,
): InfoApiResponse {
return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) {
return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) {
httpClient.get(getApiUrl(endpoint, InfoApi.PATH)).body()
}
}
Expand All @@ -84,7 +86,7 @@ internal object ThetaApi {
suspend fun callLicenseApi(
endpoint: String,
): HttpResponse {
return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) {
return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) {
httpClient.get(getApiUrl(endpoint, LicenseApi.PATH))
}
}
Expand All @@ -103,7 +105,7 @@ internal object ThetaApi {
suspend fun callStateApi(
endpoint: String,
): StateApiResponse {
return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) {
return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) {
httpClient.post(getApiUrl(endpoint, StateApi.PATH)).body()
}
}
Expand All @@ -126,7 +128,7 @@ internal object ThetaApi {
endpoint: String,
params: StatusApiParams,
): CommandApiResponse {
return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) {
return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) {
val request = StatusApiRequest(name = params.name, id = params.id)
val response = httpClient.post(getApiUrl(endpoint, StatusApi.PATH)) {
headers {
Expand Down Expand Up @@ -898,7 +900,7 @@ internal object ThetaApi {
endpoint: String,
body: T,
): HttpResponse {
return syncExecutor(requestSemaphore, ApiClient.timeout.requestTimeout) {
return syncExecutor(requestScope, ApiClient.timeout.requestTimeout) {
httpClient.post(getApiUrl(endpoint, CommandApi.PATH)) {
headers {
append("Content-Type", "application/json; charset=utf-8")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.ricoh360.thetaclient

import korlibs.crypto.md5
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout

internal const val HEX_CHARACTERS = "0123456789abcdef"
Expand All @@ -24,22 +27,22 @@ internal fun md5(data: String): String {
}

internal suspend fun <T> syncExecutor(
semaphore: Semaphore,
scope: CoroutineScope,
timeout: Long,
run: suspend () -> T,
): T {
try {
withTimeout(timeout) {
semaphore.acquire()
val deferred = CompletableDeferred<T>()
scope.launch {
runBlocking {
try {
withTimeout(timeout) {
val result = run()
deferred.complete(result)
}
} catch (e: Throwable) {
deferred.completeExceptionally(e)
}
}
} catch (e: Throwable) {
println("timeout acquire")
throw e
}
try {
val result = run()
return result
} finally {
semaphore.release()
}
return deferred.await()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ import com.ricoh360.thetaclient.MockApiClient
import com.ricoh360.thetaclient.ThetaRepository
import io.ktor.http.*
import io.ktor.utils.io.*
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlin.test.*

@OptIn(ExperimentalCoroutinesApi::class)
class ThetaRepositoryTest {
private val endpoint = "http://192.168.1.1:80/"

Expand Down Expand Up @@ -793,15 +791,20 @@ class ThetaRepositoryTest {

@Test
fun callSingleRequestTimeoutTest() = runBlocking {
var counter = 0
val jsonString = Resource("src/commonTest/resources/info/info_z1.json").readText()
MockApiClient.onRequest = { _ ->
counter += 1
runBlocking {
delay(500)
val jsonInfo = Resource("src/commonTest/resources/info/info_z1.json").readText()
val jsonState = Resource("src/commonTest/resources/state/state_z1.json").readText()
MockApiClient.onRequest = { request ->
when (request.url.encodedPath) {
"/osc/info" -> ByteReadChannel(jsonInfo)
"/osc/state" -> {
runBlocking {
delay(500)
}
ByteReadChannel(jsonState)
}

else -> throw Exception("Error")
}
counter -= 1
ByteReadChannel(jsonString)
}

val timeout = ThetaRepository.Timeout(
Expand All @@ -812,10 +815,15 @@ class ThetaRepositoryTest {
val apiJobsList = listOf(
launch {
thetaRepository.getThetaInfo()
try {
thetaRepository.getThetaInfo()
} catch (e: ThetaRepository.NotConnectedException) {
assertTrue(false)
}
},
launch {
try {
thetaRepository.getThetaInfo()
thetaRepository.getThetaState()
assertTrue(false)
} catch (e: ThetaRepository.NotConnectedException) {
assertTrue((e.message?.indexOf("time", 0, true) ?: -1) >= 0, "timeout error")
Expand Down

0 comments on commit c04c1de

Please sign in to comment.