Skip to content

Commit

Permalink
invoke NettyBody callback in a separate fiber to avoid deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
myazinn authored and nikita myazin committed Sep 24, 2023
1 parent 4123b17 commit ffca507
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions zio-http/src/main/scala/zio/http/netty/NettyBody.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import zio.stream.ZStream

import zio.http.Body.{UnsafeBytes, UnsafeWriteable}
import zio.http.internal.BodyEncoding
import zio.http.{Body, Boundary, Header, Headers, MediaType}
import zio.http.{Body, Boundary, Header, MediaType}

import io.netty.buffer.{ByteBuf, ByteBufUtil}
import io.netty.channel.{Channel => JChannel}
import io.netty.util.AsciiString

object NettyBody extends BodyEncoding {

/**
Expand Down Expand Up @@ -123,21 +123,24 @@ object NettyBody extends BodyEncoding {

override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] =
ZStream
.async[Any, Throwable, Byte](emit =>
try {
unsafeAsync(new UnsafeAsync {
override def apply(message: Chunk[Byte], isLast: Boolean): Unit = {
emit(ZIO.succeed(message))
if (isLast) {
emit(ZIO.fail(None))
.asyncZIO[Any, Throwable, Byte](emit =>
ZIO.attempt {
unsafeAsync {
new UnsafeAsync {
override def apply(message: Chunk[Byte], isLast: Boolean): Unit = {
emit.chunk(message)
if (isLast) {
emit.end
}
}

override def fail(cause: Throwable): Unit =
emit.fail(cause)
}
override def fail(cause: Throwable): Unit =
emit(ZIO.fail(Some(cause)))
})
} catch {
case e: Throwable => emit(ZIO.fail(Option(e)))
},
}
}.catchAll { e =>
ZIO.succeed(emit.fail(e))
}.fork,
)

override def isComplete: Boolean = false
Expand Down

0 comments on commit ffca507

Please sign in to comment.