From ffca5071e558cdcf86f7e110dd0bd0261f88fa41 Mon Sep 17 00:00:00 2001 From: Nikita Myazin Date: Fri, 25 Aug 2023 22:50:23 +0300 Subject: [PATCH] invoke NettyBody callback in a separate fiber to avoid deadlocks --- .../main/scala/zio/http/netty/NettyBody.scala | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala index efa698ce52..114c0a9a33 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala @@ -25,11 +25,11 @@ import zio.stream.ZStream import zio.http.Body.{UnsafeBytes, UnsafeWriteable} import zio.http.internal.BodyEncoding -import zio.http.{Body, Boundary, Header, Headers, MediaType} +import zio.http.{Body, Boundary, Header, MediaType} import io.netty.buffer.{ByteBuf, ByteBufUtil} -import io.netty.channel.{Channel => JChannel} import io.netty.util.AsciiString + object NettyBody extends BodyEncoding { /** @@ -123,21 +123,24 @@ object NettyBody extends BodyEncoding { override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] = ZStream - .async[Any, Throwable, Byte](emit => - try { - unsafeAsync(new UnsafeAsync { - override def apply(message: Chunk[Byte], isLast: Boolean): Unit = { - emit(ZIO.succeed(message)) - if (isLast) { - emit(ZIO.fail(None)) + .asyncZIO[Any, Throwable, Byte](emit => + ZIO.attempt { + unsafeAsync { + new UnsafeAsync { + override def apply(message: Chunk[Byte], isLast: Boolean): Unit = { + emit.chunk(message) + if (isLast) { + emit.end + } } + + override def fail(cause: Throwable): Unit = + emit.fail(cause) } - override def fail(cause: Throwable): Unit = - emit(ZIO.fail(Some(cause))) - }) - } catch { - case e: Throwable => emit(ZIO.fail(Option(e))) - }, + } + }.catchAll { e => + ZIO.succeed(emit.fail(e)) + }.fork, ) override def isComplete: Boolean = false