Skip to content

Commit

Permalink
[WIP] add basic load test for Apache HTTP client, without information…
Browse files Browse the repository at this point in the history
…al report
  • Loading branch information
sunny-chung committed Feb 25, 2024
1 parent dbb3c2d commit 11ec249
Show file tree
Hide file tree
Showing 20 changed files with 898 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.sunnychung.application.multiplatform.hellohttp.extension

import kotlin.math.roundToInt

/**
* Can only be used on a **sorted** list.
*
Expand All @@ -10,3 +12,16 @@ fun <T> List<T>.binarySearchForInsertionPoint(comparison: (T) -> Int): Int {
if (r >= 0) throw IllegalArgumentException("Parameter `comparison` should never return 0")
return -(r + 1)
}

fun <T : Number> List<T>.atPercent(percent: Int): Double {
if (isEmpty()) return 0.0
return if (percent == 50) {
if (size % 2 == 0) {
this[lastIndex / 2].toDouble() / 2.0 + this[lastIndex / 2 + 1].toDouble() / 2.0
} else {
this[lastIndex / 2].toDouble()
}
} else {
this[(lastIndex * percent.toDouble() / 100.0).roundToInt()].toDouble()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.sunnychung.application.multiplatform.hellohttp.extension

import java.util.SortedMap

fun <K, V> SortedMap<K, V>.lastOrNull(): V? =
if (isEmpty()) {
null
} else {
get(lastKey())
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,34 @@ import com.sunnychung.application.multiplatform.hellohttp.helper.VariableResolve
import com.sunnychung.application.multiplatform.hellohttp.model.Environment
import com.sunnychung.application.multiplatform.hellohttp.model.FieldValueType
import com.sunnychung.application.multiplatform.hellohttp.model.HttpConfig
import com.sunnychung.application.multiplatform.hellohttp.model.HttpRequest
import com.sunnychung.application.multiplatform.hellohttp.model.LoadTestInput
import com.sunnychung.application.multiplatform.hellohttp.model.LoadTestResponse
import com.sunnychung.application.multiplatform.hellohttp.model.LoadTestState
import com.sunnychung.application.multiplatform.hellohttp.model.LongDuplicateContainer
import com.sunnychung.application.multiplatform.hellohttp.model.PayloadMessage
import com.sunnychung.application.multiplatform.hellohttp.model.ProtocolApplication
import com.sunnychung.application.multiplatform.hellohttp.model.SslConfig
import com.sunnychung.application.multiplatform.hellohttp.model.UserKeyValuePair
import com.sunnychung.application.multiplatform.hellohttp.model.UserRequestTemplate
import com.sunnychung.application.multiplatform.hellohttp.model.UserResponse
import com.sunnychung.application.multiplatform.hellohttp.model.UserResponseByResponseTime
import com.sunnychung.application.multiplatform.hellohttp.network.CallData
import com.sunnychung.application.multiplatform.hellohttp.network.ConnectionStatus
import com.sunnychung.application.multiplatform.hellohttp.network.LiteCallData
import com.sunnychung.application.multiplatform.hellohttp.network.hostFromUrl
import com.sunnychung.application.multiplatform.hellohttp.util.log
import com.sunnychung.application.multiplatform.hellohttp.util.upsert
import com.sunnychung.application.multiplatform.hellohttp.util.uuidString
import com.sunnychung.lib.multiplatform.kdatetime.KInstant
import io.grpc.Status
import io.grpc.StatusRuntimeException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
Expand Down Expand Up @@ -75,7 +85,15 @@ class NetworkClientManager : CallDataStore {
override fun provideCallDataStore(): ConcurrentHashMap<String, CallData> = callDataMap
override fun provideLiteCallDataStore(): ConcurrentHashMap<String, LiteCallData> = liteCallDataMap

fun fireRequest(request: UserRequestTemplate, requestExampleId: String, environment: Environment?, projectId: String, subprojectId: String) {
fun fireRequest(
request: UserRequestTemplate,
requestExampleId: String,
environment: Environment?,
projectId: String,
subprojectId: String,
fireType: UserResponse.Type,
parentLoadTestState: LoadTestState? = null,
) : CallData {
val callData = try {
val networkRequest = request.toHttpRequest(
exampleId = requestExampleId,
Expand Down Expand Up @@ -165,12 +183,7 @@ class NetworkClientManager : CallDataStore {
null
}

val networkManager = when (networkRequest.application) {
ProtocolApplication.Graphql -> graphqlSubscriptionTransportClient
ProtocolApplication.WebSocket -> webSocketTransportClient
ProtocolApplication.Grpc -> grpcTransportClient
else -> httpTransportClient
}
val networkManager = networkRequest.getNetworkManager()

networkManager.sendRequest(
request = networkRequest,
Expand All @@ -180,6 +193,8 @@ class NetworkClientManager : CallDataStore {
postFlightAction = postFlightAction,
httpConfig = environment?.httpConfig ?: HttpConfig(),
sslConfig = environment?.sslConfig ?: SslConfig(),
fireType = fireType,
parentLoadTestState = parentLoadTestState,
)
} catch (error: Throwable) {
val d = CallData(
Expand All @@ -193,6 +208,7 @@ class NetworkClientManager : CallDataStore {
isError = true,
errorMessage = error.message
),
fireType = fireType,

events = emptySharedFlow(),
eventsStateFlow = MutableStateFlow(null),
Expand All @@ -202,22 +218,167 @@ class NetworkClientManager : CallDataStore {
cancel = {},
)
log.d(error) { "Got error while firing request" }

if (parentLoadTestState != null) {
d.complete()
onCompleteResponse(d)
// callDataMap[parentLoadTestState.callId]?.cancel?.invoke(error)
}

// `networkManager.sendRequest` would update callDataMap, but on error nobody updates
// so manually update here
callDataMap[d.id] = d
d
}
val oldCallId = requestExampleToCallMapping.put(requestExampleId, callData.id)
if (oldCallId != null) {
CoroutineScope(Dispatchers.IO).launch {
callDataMap[oldCallId]?.cancel?.invoke(null)
callDataMap.remove(oldCallId)
if (fireType != UserResponse.Type.LoadTestChild) {
val oldCallId = requestExampleToCallMapping.put(requestExampleId, callData.id)
if (oldCallId != null) {
CoroutineScope(Dispatchers.IO).launch {
callDataMap[oldCallId]?.cancel?.invoke(null)
callDataMap.remove(oldCallId)
}
}
callIdFlow.value = callData.id

persistResponseManager.registerCall(callData)
}
callIdFlow.value = callData.id
persistResponseManager.registerCall(callData)
if (!callData.response.isError) {
callData.isPrepared = true
if (parentLoadTestState != null) {
parentLoadTestState.numRequestsSent.incrementAndGet()
}
}
return callData
}

private fun HttpRequest.getNetworkManager() = when (this.application) {
ProtocolApplication.Graphql -> graphqlSubscriptionTransportClient
ProtocolApplication.WebSocket -> webSocketTransportClient
ProtocolApplication.Grpc -> grpcTransportClient
else -> httpTransportClient
}

fun fireLoadTestRequests(
input: LoadTestInput,
request: UserRequestTemplate,
requestExampleId: String,
environment: Environment?,
projectId: String,
subprojectId: String,
) {
with (CoroutineScope(Dispatchers.IO)) {
launch {
val coroutineContext = currentCoroutineContext()
val input = input.copy(
requestId = request.id,
requestExampleId = requestExampleId,
application = ProtocolApplication.Http,
// requestData = // TODO refactor into specific transport manager
)
// TODO refactor into specific transport manager
val loadTestState = LoadTestState(input = input, startAt = KInstant.now(), callId = uuidString()) { resp ->
if (resp.isError) {
LoadTestResponse.Category.ClientError
} else {
if (resp.statusCode?.let { it >= 200 && it < 300 } == true) {
LoadTestResponse.Category.Success
} else {
LoadTestResponse.Category.ServerError
}
}
}

var isCompleted = false
val networkRequest = request.toHttpRequest(
exampleId = requestExampleId,
environment = environment
)
val networkManager = networkRequest.getNetworkManager()
val callData = networkManager.createCallData(
callId = loadTestState.callId,
requestBodySize = null,
requestExampleId = requestExampleId,
requestId = request.id,
subprojectId = subprojectId,
sslConfig = environment?.sslConfig ?: SslConfig(),
fireType = UserResponse.Type.LoadTest,
loadTestState = loadTestState,
)
callData.cancel = { e ->
isCompleted = true
callData.status = ConnectionStatus.DISCONNECTED
callData.response.loadTestResult = runBlocking { loadTestState.toResult(1000L) }
networkManager.emitEvent(callData.id, "Completed")
coroutineContext.cancel(e?.let { CancellationException("Cancelled due to error: ${e.message}", e) })
}
callData.status = ConnectionStatus.CONNECTING
callData.response.startAt = KInstant.now()
persistResponseManager.registerCall(callData)

val endTime = callData.response.startAt!! + input.intendedDuration

val oldCallId = requestExampleToCallMapping.put(requestExampleId, callData.id)
if (oldCallId != null) {
CoroutineScope(Dispatchers.IO).launch {
callDataMap[oldCallId]?.cancel?.invoke(null)
callDataMap.remove(oldCallId)
}
}
callIdFlow.value = callData.id

val jobs = (1..input.numConcurrent).map { i ->
launch {
do {
log.v { "LoadTest fireRequest C#$i" }
val call = fireRequest(
request = request,
requestExampleId = requestExampleId,
environment = environment,
projectId = projectId,
subprojectId = subprojectId,
fireType = UserResponse.Type.LoadTestChild,
parentLoadTestState = loadTestState,
)
call.awaitComplete()
log.v { "LoadTest complete C#$i" }
onCompleteResponse(call)
log.v { "LoadTest onCompleteResponse C#$i" }
} while (
callData.status != ConnectionStatus.DISCONNECTED // not cancelled
&& !call.response.isError // not client-side error
&& KInstant.now() < endTime
)
}
}

val reportJob = launch {
while (!isCompleted) {
callData.response.loadTestResult = loadTestState.toResult(1000L)
networkManager.emitEvent(callData.id, "update report")
delay(1000)
}
callData.response.loadTestResult = loadTestState.toResult(1000L)
networkManager.emitEvent(callData.id, "update report")
}

jobs.forEach { it.join() }
callData.response.endAt = KInstant.now()
isCompleted = true
reportJob.join()
callData.response.loadTestResult = loadTestState.toResult(1000L)
callData.status = ConnectionStatus.DISCONNECTED
networkManager.emitEvent(callData.id, "Completed")
log.d { "Complete load test. Result: ${callData.response.loadTestResult}" }
}
}
}

private fun onCompleteResponse(call: CallData) {
val response = call.response
call.loadTestState?.let { loadTestState ->
val endAt = response.endAt ?: KInstant.now()
loadTestState.latenciesMs += LongDuplicateContainer((endAt - response.startAt!!).toMilliseconds())
loadTestState.responsesOverResponseTime += UserResponseByResponseTime(response)
}
}

Expand Down
Loading

0 comments on commit 11ec249

Please sign in to comment.