Skip to content

Commit

Permalink
update to cancel inactive HTTP/2 connection after it is hanging for 3…
Browse files Browse the repository at this point in the history
… seconds
  • Loading branch information
sunny-chung committed Jan 9, 2024
1 parent cefc877 commit cc5fa40
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class NetworkClientManager : CallDataStore {
val oldCallId = requestExampleToCallMapping.put(requestExampleId, callData.id)
if (oldCallId != null) {
CoroutineScope(Dispatchers.IO).launch {
callDataMap[oldCallId]?.cancel?.invoke()
callDataMap[oldCallId]?.cancel?.invoke(null)
callDataMap.remove(oldCallId)
}
}
Expand All @@ -231,7 +231,7 @@ class NetworkClientManager : CallDataStore {
}

fun cancel(selectedRequestExampleId: String) {
getCallDataByRequestExampleId(selectedRequestExampleId)?.let { it.cancel() }
getCallDataByRequestExampleId(selectedRequestExampleId)?.let { it.cancel(null) }
}

private fun <T> emptySharedFlow() = emptyFlow<T>().shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ import com.sunnychung.application.multiplatform.hellohttp.network.apache.Http2Fr
import com.sunnychung.application.multiplatform.hellohttp.network.util.CallDataUserResponseUtil
import com.sunnychung.application.multiplatform.hellohttp.util.log
import com.sunnychung.lib.multiplatform.kdatetime.KInstant
import com.sunnychung.lib.multiplatform.kdatetime.extension.seconds
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.apache.hc.client5.http.SystemDefaultDnsResolver
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer
Expand All @@ -44,6 +50,7 @@ import org.apache.hc.core5.http.nio.RequestChannel
import org.apache.hc.core5.http.protocol.HttpContext
import org.apache.hc.core5.http2.HttpVersionPolicy
import org.apache.hc.core5.http2.config.H2Config
import org.apache.hc.core5.http2.frame.FrameFlag
import org.apache.hc.core5.http2.frame.FrameType
import org.apache.hc.core5.http2.frame.RawFrame
import org.apache.hc.core5.http2.hpack.HPackInspectHeader
Expand All @@ -54,8 +61,11 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import java.security.Principal
import java.security.cert.Certificate
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext

class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : AbstractTransportClient(networkClientManager) {

Expand Down Expand Up @@ -172,6 +182,9 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
},
object : H2InspectListener {
val suspendedHeaderFrames = ConcurrentHashMap<Int, H2HeaderFrame>()
val openedStreamIds = Collections.newSetFromMap(ConcurrentHashMap<Int, Boolean>())
val lock = Mutex()
val isCancelled = AtomicBoolean(false)

override fun onHeaderInputDecoded(connection: HttpConnection, streamId: Int?, headers: MutableList<HPackInspectHeader>) {
val serialized = http2FrameSerializer.serializeHeaders(headers)
Expand Down Expand Up @@ -211,6 +224,14 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
val type = FrameType.valueOf(frame.type)
log.d { "processFrame $streamId $type" }
if (type == FrameType.HEADERS) {
if (frame.flags and FrameFlag.END_STREAM.value == 0) {
openedStreamIds += streamId
} else {
openedStreamIds -= streamId
if (openedStreamIds.isEmpty()) {
checkForHangConnectionLater()
}
}
val frame = suspendedHeaderFrames.getOrPut(streamId) { H2HeaderFrame(streamId = streamId) }
frame.frameHeader = serialized
if (frame.isComplete()) {
Expand All @@ -232,6 +253,20 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
}
}

fun checkForHangConnectionLater() {
CoroutineScope(Dispatchers.IO).launch {
delay(3.seconds().toMilliseconds())
lock.withLock {
if (openedStreamIds.isEmpty() && !isCancelled.get()) {
val message = "The connection has no active stream for some seconds, it appears to be hanging. Cancelling the connection."
emitEvent(callId, message)
callData.cancel(Exception(message))
isCancelled.set(true)
}
}
}
}

}
)

Expand Down Expand Up @@ -304,7 +339,7 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
}
}

CoroutineScope(Dispatchers.IO).launch {
CoroutineScope(Dispatchers.IO).launch(coroutineExceptionHandler()) {
val callData = callData[callId]!!
callData.waitForPreparation()
log.d { "Call $callId is prepared" }
Expand Down Expand Up @@ -407,15 +442,15 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab

})

data.cancel = {
data.cancel = { error ->
log.d { "Request to cancel the call" }
val cancelResult = call.cancel() // no use at all
log.d { "Cancel result = $cancelResult" }
httpClient.close(CloseMode.IMMEDIATE)

// httpClient.close is buggy. Do not rely on it
data.status = ConnectionStatus.DISCONNECTED
this.cancel()
this.cancel(error?.let { CancellationException(it.message, it) })
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ class GrpcTransportClient(networkClientManager: NetworkClientManager) : Abstract

var (responseFlow, responseObserver) = flowAndStreamObserver<DynamicMessage>()
try {
val cancel = {
val cancel = { _: Throwable? ->
if (call.status.isConnectionActive()) {
try {
responseObserver.onCompleted()
Expand Down Expand Up @@ -584,7 +584,7 @@ class GrpcTransportClient(networkClientManager: NetworkClientManager) : Abstract
log.d { "Response = ${responseJsonData.decodeToString()}" }
} catch (e: Throwable) {
setStreamError(e)
call.cancel()
call.cancel(e)
}

if (postFlightAction != null) {
Expand All @@ -604,7 +604,7 @@ class GrpcTransportClient(networkClientManager: NetworkClientManager) : Abstract
requestObserver.onNext(request)
} catch (e: Throwable) {
setStreamError(e)
call.cancel()
call.cancel(e)
}
}
}
Expand All @@ -618,7 +618,7 @@ class GrpcTransportClient(networkClientManager: NetworkClientManager) : Abstract
if (!hasInvokedCancelDueToError) {
hasInvokedCancelDueToError = true
setStreamError(e)
call.cancel()
call.cancel(e)
}
}
}
Expand All @@ -627,12 +627,12 @@ class GrpcTransportClient(networkClientManager: NetworkClientManager) : Abstract
fun initiateClientStreamableCall(requestObserver: StreamObserver<DynamicMessage>) {
call.sendPayload = buildSendPayloadFunction(requestObserver)
call.sendEndOfStream = buildSendEndOfStream(requestObserver)
call.cancel = {
call.cancel = { e ->
if (call.status.isConnectionActive()) {
try {
call.sendEndOfStream()
} catch (_: Throwable) {}
cancel()
cancel(e)
}
}
// actually, at this stage it could be not yet connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CallData(
val optionalResponseSize: AtomicInteger,
val response: UserResponse,

var cancel: () -> Unit,
var cancel: (Throwable?) -> Unit,
var sendPayload: (String) -> Unit = {},
var sendEndOfStream: () -> Unit = {},
)
Expand Down

0 comments on commit cc5fa40

Please sign in to comment.