From 2caf4cf0d9b7c22ef223a2b33ead31cbeeacc14e Mon Sep 17 00:00:00 2001 From: davidl Date: Fri, 27 Sep 2024 17:03:08 +0200 Subject: [PATCH] Updates after review --- zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala b/zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala index 7a3bb454f9..3356297616 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/NettyBody.scala @@ -154,7 +154,8 @@ object NettyBody extends BodyEncoding { queue <- ZIO.acquireRelease(Queue.unbounded[Take[E, A]])(_.shutdown) runtime <- ZIO.runtime[R] } yield { - val rtm = runtime.unsafe + val maybeRead = ZChannel.fromZIO(nettyRead.whenZIODiscard(queue.isEmpty)) + val rtm = runtime.unsafe register { k => try { rtm @@ -174,7 +175,7 @@ object NettyBody extends BodyEncoding { maybeError => ZChannel.fromZIO(queue.shutdown) *> maybeError.fold[ZChannel[Any, Any, Any, Any, E, Chunk[A], Unit]](ZChannel.unit)(ZChannel.fail(_)), - a => ZChannel.write(a) *> ZChannel.fromZIO(nettyRead.whenZIO(queue.isEmpty)) *> loop, + a => ZChannel.write(a) *> maybeRead *> loop, ), )