Skip to content

Commit

Permalink
Fix Client manual interruption (zio#2889)
Browse files Browse the repository at this point in the history
* Fix client connection manual interruption

* Add comment to spec
  • Loading branch information
kyri-petrou authored Jun 6, 2024
1 parent 80b604d commit a5fdfc1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,14 @@ object NettyConnectionPool {
case _ => bootstrap
}).connect()
}
_ <- NettyFutureExecutor.executed(channelFuture)
ch <- ZIO.attempt(channelFuture.channel())
_ <- Scope.addFinalizer(NettyFutureExecutor.executed(ch.close()).when(ch.isOpen).ignoreLogged)
_ <- Scope.addFinalizer {
NettyFutureExecutor.executed {
channelFuture.cancel(true)
ch.close()
}.when(ch.isOpen).ignoreLogged
}
_ <- NettyFutureExecutor.executed(channelFuture)
} yield ch
}

Expand Down
8 changes: 7 additions & 1 deletion zio-http/jvm/src/test/scala/zio/http/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.annotation.nowarn

import zio._
import zio.test.Assertion._
import zio.test.TestAspect.{sequential, timeout, withLiveClock}
import zio.test.TestAspect.{flaky, sequential, timeout, withLiveClock}
import zio.test._

import zio.stream.ZStream
Expand Down Expand Up @@ -100,6 +100,12 @@ object ClientSpec extends HttpRunnableSpec {
val effect = app.deployAndRequest(requestCode).runZIO(())
assertZIO(effect)(isTrue)
},
test("request can be timed out manually while awaiting connection") {
// Unfortunately we have to use a real URL here, as we can't really simulate a long connection time
val url = URL.decode("https://test.com").toOption.get
val resp = ZIO.scoped(ZClient.request(Request.get(url))).timeout(500.millis)
assertZIO(resp)(isNone)
} @@ timeout(5.seconds) @@ flaky(5),
)

override def spec = {
Expand Down
36 changes: 21 additions & 15 deletions zio-http/shared/src/main/scala/zio/http/ZClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -682,26 +682,30 @@ object ZClient extends ZClientPlatformSpecific {
case location: Location.Absolute =>
ZIO.uninterruptibleMask { restore =>
for {
onComplete <- Promise.make[Throwable, ChannelState]
onResponse <- Promise.make[Throwable, Response]
connectionAcquired <- Ref.make(false)
onComplete <- Promise.make[Throwable, ChannelState]
onResponse <- Promise.make[Throwable, Response]
inChannelScope = outerScope match {
case Some(scope) => (zio: ZIO[Scope, Throwable, Unit]) => scope.extend(zio)
case None => (zio: ZIO[Scope, Throwable, Unit]) => ZIO.scoped(zio)
}
channelFiber <- inChannelScope {
for {
connection <- connectionPool
.get(
location,
clientConfig.proxy,
clientConfig.ssl.getOrElse(ClientSSLConfig.Default),
clientConfig.maxInitialLineLength,
clientConfig.maxHeaderSize,
clientConfig.requestDecompression,
clientConfig.idleTimeout,
clientConfig.connectionTimeout,
clientConfig.localAddress,
)
connection <- restore(
connectionPool
.get(
location,
clientConfig.proxy,
clientConfig.ssl.getOrElse(ClientSSLConfig.Default),
clientConfig.maxInitialLineLength,
clientConfig.maxHeaderSize,
clientConfig.requestDecompression,
clientConfig.idleTimeout,
clientConfig.connectionTimeout,
clientConfig.localAddress,
),
)
.zipLeft(connectionAcquired.set(true))
.tapErrorCause(cause => onResponse.failCause(cause))
.map(_.asInstanceOf[driver.Connection])
channelInterface <-
Expand Down Expand Up @@ -742,7 +746,9 @@ object ZClient extends ZClientPlatformSpecific {
}.forkDaemon // Needs to live as long as the channel is alive, as the response body may be streaming
_ <- ZIO.addFinalizer(onComplete.interrupt)
response <- restore(onResponse.await.onInterrupt {
onComplete.interrupt *> channelFiber.join.orDie
ZIO.unlessZIO(connectionAcquired.get)(channelFiber.interrupt) *>
onComplete.interrupt *>
channelFiber.await
})
} yield response
}
Expand Down

0 comments on commit a5fdfc1

Please sign in to comment.