Skip to content

Commit

Permalink
Fix memory leak in netty connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
kyri-petrou committed Jun 14, 2024
1 parent ec93989 commit dc2237e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ abstract class AsyncBodyReader extends SimpleChannelInboundHandler[HttpContent](
case None => false
case Some((_, isLast)) => isLast
}
buffer.clear() // GC

if (ctx.channel.isOpen || readingDone) {
state = State.Direct(callback)
Expand Down
10 changes: 9 additions & 1 deletion zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,17 @@ object NettyBody extends BodyEncoding {
} catch {
case e: Throwable => emit(ZIO.fail(Option(e)))
},
4096,
streamBufferSize,
)

// No need to create a large buffer when we know the response is small
private[this] def streamBufferSize: Int = {
val cl = knownContentLength.getOrElse(4096L)
if (cl <= 16L) 16
else if (cl >= 4096) 4096
else Integer.highestOneBit(cl.toInt - 1) << 1 // Round to next power of 2
}

override def isComplete: Boolean = false

override def isEmpty: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.netty.channel.{Channel, ChannelFactory, ChannelFuture, ChannelHandler,
import io.netty.handler.codec.PrematureChannelClosureException
import io.netty.handler.codec.http.websocketx.{WebSocketClientProtocolHandler, WebSocketFrame => JWebSocketFrame}
import io.netty.handler.codec.http.{FullHttpRequest, HttpObjectAggregator, HttpRequest}
import io.netty.util.concurrent.GenericFutureListener

final case class NettyClientDriver private[netty] (
channelFactory: ChannelFactory[Channel],
Expand All @@ -51,7 +52,7 @@ final case class NettyClientDriver private[netty] (
createSocketApp: () => WebSocketApp[Any],
webSocketConfig: WebSocketConfig,
)(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface] = {
val f = NettyRequestEncoder.encode(req).flatMap { jReq =>
NettyRequestEncoder.encode(req).flatMap { jReq =>
for {
_ <- Scope.addFinalizer {
ZIO.attempt {
Expand Down Expand Up @@ -135,26 +136,7 @@ final case class NettyClientDriver private[netty] (

val frozenToRemove = toRemove.toSet

new ChannelInterface {
override def resetChannel: ZIO[Any, Throwable, ChannelState] =
ZIO.attempt {
frozenToRemove.foreach(pipeline.remove)
ChannelState.Reusable // channel can be reused
}

override def interrupt: ZIO[Any, Throwable, Unit] =
NettyFutureExecutor.executed(channel.disconnect())
}
}
}
}

f.ensuring {
// If the channel was closed and the promises were not completed, this will lead to the request hanging so we need
// to listen to the close future and complete the promises
ZIO.unless(location.scheme.isWebSocket) {
ZIO.succeedUnsafe { implicit u =>
channel.closeFuture().addListener { (_: ChannelFuture) =>
val closeListener: GenericFutureListener[ChannelFuture] = (_: ChannelFuture) =>
// If onComplete was already set, it means another fiber is already in the process of fulfilling the promises
// so we don't need to fulfill `onResponse`
nettyRuntime.unsafeRunSync {
Expand All @@ -167,12 +149,24 @@ final case class NettyClientDriver private[netty] (
),
)
.unit
}
}(Unsafe.unsafe, trace)

channel.closeFuture().addListener(closeListener)

new ChannelInterface {
override def resetChannel: ZIO[Any, Throwable, ChannelState] =
ZIO.attempt {
channel.closeFuture().removeListener(closeListener)
frozenToRemove.foreach(pipeline.remove)
ChannelState.Reusable // channel can be reused
}

override def interrupt: ZIO[Any, Throwable, Unit] =
NettyFutureExecutor.executed(channel.disconnect())
}
}
}
}

}

override def createConnectionPool(dnsResolver: DnsResolver, config: ConnectionPoolConfig)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,37 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec {
serverTestLayer,
) @@ withLiveClock @@ nonFlaky(10)

private def connectionPoolIssuesSpec = {
suite("ConnectionPoolIssuesSpec")(
test("Reusing connections doesn't cause memory leaks") {
Random.nextString(1024 * 1024).flatMap { text =>
val resp = Response.text(text)
Handler
.succeed(resp)
.toRoutes
.deployAndRequest { client =>
ZIO.foreachParDiscard(0 to 10) { _ =>
ZIO
.scoped[Any](client.request(Request()).flatMap(_.body.asArray))
.repeatN(200)
}
}(Request())
.as(assertCompletes)
}
},
)
}.provide(
ZLayer(appKeepAliveEnabled.unit),
DynamicServer.live,
serverTestLayer,
Client.customized,
ZLayer.succeed(ZClient.Config.default.dynamicConnectionPool(1, 512, 60.seconds)),
NettyClientDriver.live,
DnsResolver.default,
ZLayer.succeed(NettyConfig.defaultWithFastShutdown),
Scope.default,
)

def connectionPoolSpec: Spec[Any, Throwable] =
suite("ConnectionPool")(
suite("fixed")(
Expand Down Expand Up @@ -310,7 +341,7 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec {
)

override def spec: Spec[Scope, Throwable] = {
connectionPoolSpec @@ sequential @@ withLiveClock
(connectionPoolSpec + connectionPoolIssuesSpec) @@ sequential @@ withLiveClock
}

}

0 comments on commit dc2237e

Please sign in to comment.