From dc2237efa506c71d6fe0ffd9ebbe6c3bd3b7c2de Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Fri, 14 Jun 2024 11:48:13 +1000 Subject: [PATCH] Fix memory leak in netty connection pool --- .../zio/http/netty/AsyncBodyReader.scala | 1 + .../main/scala/zio/http/netty/NettyBody.scala | 10 ++++- .../http/netty/client/NettyClientDriver.scala | 40 ++++++++----------- .../client/NettyConnectionPoolSpec.scala | 33 ++++++++++++++- 4 files changed, 59 insertions(+), 25 deletions(-) diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala b/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala index db11dfa77b..fc97550cd7 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala @@ -44,6 +44,7 @@ abstract class AsyncBodyReader extends SimpleChannelInboundHandler[HttpContent]( case None => false case Some((_, isLast)) => isLast } + buffer.clear() // GC if (ctx.channel.isOpen || readingDone) { state = State.Direct(callback) diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala b/zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala index ee58652262..86afacdfa1 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala @@ -161,9 +161,17 @@ object NettyBody extends BodyEncoding { } catch { case e: Throwable => emit(ZIO.fail(Option(e))) }, - 4096, + streamBufferSize, ) + // No need to create a large buffer when we know the response is small + private[this] def streamBufferSize: Int = { + val cl = knownContentLength.getOrElse(4096L) + if (cl <= 16L) 16 + else if (cl >= 4096) 4096 + else Integer.highestOneBit(cl.toInt - 1) << 1 // Round to next power of 2 + } + override def isComplete: Boolean = false override def isEmpty: Boolean = false diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyClientDriver.scala b/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyClientDriver.scala index 0a0b4e44a5..d7f44ae714 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyClientDriver.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/client/NettyClientDriver.scala @@ -32,6 +32,7 @@ import io.netty.channel.{Channel, ChannelFactory, ChannelFuture, ChannelHandler, import io.netty.handler.codec.PrematureChannelClosureException import io.netty.handler.codec.http.websocketx.{WebSocketClientProtocolHandler, WebSocketFrame => JWebSocketFrame} import io.netty.handler.codec.http.{FullHttpRequest, HttpObjectAggregator, HttpRequest} +import io.netty.util.concurrent.GenericFutureListener final case class NettyClientDriver private[netty] ( channelFactory: ChannelFactory[Channel], @@ -51,7 +52,7 @@ final case class NettyClientDriver private[netty] ( createSocketApp: () => WebSocketApp[Any], webSocketConfig: WebSocketConfig, )(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface] = { - val f = NettyRequestEncoder.encode(req).flatMap { jReq => + NettyRequestEncoder.encode(req).flatMap { jReq => for { _ <- Scope.addFinalizer { ZIO.attempt { @@ -135,26 +136,7 @@ final case class NettyClientDriver private[netty] ( val frozenToRemove = toRemove.toSet - new ChannelInterface { - override def resetChannel: ZIO[Any, Throwable, ChannelState] = - ZIO.attempt { - frozenToRemove.foreach(pipeline.remove) - ChannelState.Reusable // channel can be reused - } - - override def interrupt: ZIO[Any, Throwable, Unit] = - NettyFutureExecutor.executed(channel.disconnect()) - } - } - } - } - - f.ensuring { - // If the channel was closed and the promises were not completed, this will lead to the request hanging so we need - // to listen to the close future and complete the promises - ZIO.unless(location.scheme.isWebSocket) { - ZIO.succeedUnsafe { implicit u => - channel.closeFuture().addListener { (_: ChannelFuture) => + val closeListener: GenericFutureListener[ChannelFuture] = (_: ChannelFuture) => // If onComplete was already set, it means another fiber is already in the process of fulfilling the promises // so we don't need to fulfill `onResponse` nettyRuntime.unsafeRunSync { @@ -167,12 +149,24 @@ final case class NettyClientDriver private[netty] ( ), ) .unit - } + }(Unsafe.unsafe, trace) + + channel.closeFuture().addListener(closeListener) + + new ChannelInterface { + override def resetChannel: ZIO[Any, Throwable, ChannelState] = + ZIO.attempt { + channel.closeFuture().removeListener(closeListener) + frozenToRemove.foreach(pipeline.remove) + ChannelState.Reusable // channel can be reused + } + + override def interrupt: ZIO[Any, Throwable, Unit] = + NettyFutureExecutor.executed(channel.disconnect()) } } } } - } override def createConnectionPool(dnsResolver: DnsResolver, config: ConnectionPoolConfig)(implicit diff --git a/zio-http/jvm/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala b/zio-http/jvm/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala index b7fe1b575a..e8f841aa7d 100644 --- a/zio-http/jvm/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala +++ b/zio-http/jvm/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala @@ -249,6 +249,37 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec { serverTestLayer, ) @@ withLiveClock @@ nonFlaky(10) + private def connectionPoolIssuesSpec = { + suite("ConnectionPoolIssuesSpec")( + test("Reusing connections doesn't cause memory leaks") { + Random.nextString(1024 * 1024).flatMap { text => + val resp = Response.text(text) + Handler + .succeed(resp) + .toRoutes + .deployAndRequest { client => + ZIO.foreachParDiscard(0 to 10) { _ => + ZIO + .scoped[Any](client.request(Request()).flatMap(_.body.asArray)) + .repeatN(200) + } + }(Request()) + .as(assertCompletes) + } + }, + ) + }.provide( + ZLayer(appKeepAliveEnabled.unit), + DynamicServer.live, + serverTestLayer, + Client.customized, + ZLayer.succeed(ZClient.Config.default.dynamicConnectionPool(1, 512, 60.seconds)), + NettyClientDriver.live, + DnsResolver.default, + ZLayer.succeed(NettyConfig.defaultWithFastShutdown), + Scope.default, + ) + def connectionPoolSpec: Spec[Any, Throwable] = suite("ConnectionPool")( suite("fixed")( @@ -310,7 +341,7 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec { ) override def spec: Spec[Scope, Throwable] = { - connectionPoolSpec @@ sequential @@ withLiveClock + (connectionPoolSpec + connectionPoolIssuesSpec) @@ sequential @@ withLiveClock } }