Skip to content

Commit

Permalink
Proper fix for #2284
Browse files Browse the repository at this point in the history
  • Loading branch information
guersam committed Sep 24, 2023
1 parent 5ca76dc commit c20d756
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 31 deletions.
56 changes: 38 additions & 18 deletions zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import io.netty.channel._
import io.netty.handler.codec.http.{DefaultHttpContent, LastHttpContent}
object NettyBodyWriter {

def writeAndFlush(body: Body, ctx: ChannelHandlerContext)(implicit trace: Trace): Option[Task[Unit]] =
def writeAndFlush(body: Body, contentLength: Option[Long], ctx: ChannelHandlerContext)(implicit
trace: Trace,
): Option[Task[Unit]] =
body match {
case body: ByteBufBody =>
ctx.write(body.byteBuf)
Expand Down Expand Up @@ -66,27 +68,45 @@ object NettyBodyWriter {
None
case StreamBody(stream, _, _) =>
Some(
stream.chunks
.runFoldZIO(Option.empty[Chunk[Byte]]) {
case (Some(previous), current) =>
contentLength match {
case Some(length) =>
stream.chunks
.runFoldZIO(length) { (remaining, bytes) =>
remaining - bytes.size match {
case 0L =>
NettyFutureExecutor.executed {
// Flushes the last body content and LastHttpContent together to avoid race conditions.
ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}.as(0L)

case n =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}.as(n)
}
}
.flatMap {
case 0L => ZIO.unit
case remaining =>
val actualLength = length - remaining
ZIO.logWarning(s"Expected Content-Length of $length, but sent $actualLength bytes") *>
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
}

case None =>
stream.chunks.mapZIO { bytes =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(previous.toArray)))
} *>
ZIO.succeed(Some(current))
case (_, current) =>
ZIO.succeed(Some(current))
}
.flatMap { maybeLastChunk =>
// last chunk is handled separately to avoid fiber interrupt before EMPTY_LAST_CONTENT is sent
ZIO.attempt(
maybeLastChunk.foreach { lastChunk =>
ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(lastChunk.toArray)))
},
) *>
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}
}.runDrain.zipRight {
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
},
}
},
)
case ChunkBody(data, _, _) =>
ctx.write(Unpooled.wrappedBuffer(data.toArray))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final class ClientInboundHandler(
ctx.writeAndFlush(fullRequest)
case _: HttpRequest =>
ctx.write(jReq)
NettyBodyWriter.writeAndFlush(req.body, ctx).foreach { effect =>
NettyBodyWriter.writeAndFlush(req.body, None, ctx).foreach { effect =>
rtm.run(ctx, NettyRuntime.noopEnsuring)(effect)(Unsafe.unsafe, trace)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,13 @@ private[zio] final case class ServerInboundHandler(
val jResponse = NettyResponseEncoder.encode(ctx, response, runtime)
// setServerTime(time, response, jResponse)
ctx.writeAndFlush(jResponse)
if (!jResponse.isInstanceOf[FullHttpResponse])
NettyBodyWriter.writeAndFlush(response.body, ctx)
else
if (!jResponse.isInstanceOf[FullHttpResponse]) {
val contentLength = jResponse.headers.get(HttpHeaderNames.CONTENT_LENGTH) match {
case null => None
case value => Some(value.toLong)
}
NettyBodyWriter.writeAndFlush(response.body, contentLength, ctx)
} else
None
}
}
Expand Down
14 changes: 5 additions & 9 deletions zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,9 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
)
client <- ZIO.service[Client]
firstResponse <- makeRequest(client, port)
firstResponseBodyReceive <- firstResponse.body.asStream.chunks
.map(chunk => new String(chunk.toArray))
.mapZIO { chunk =>
atLeastOneChunkReceived.succeed(()) *> ZIO.succeed(chunk)
}
.runCollect
.fork
firstResponseBodyReceive <- firstResponse.body.asStream.chunks.mapZIO { chunk =>
atLeastOneChunkReceived.succeed(()).as(chunk.asString)
}.runCollect.fork
_ <- firstResponseQueue.offerAll(message.getBytes.toList)
_ <- atLeastOneChunkReceived.await
// saying that there will be no more data in the first response stream
Expand All @@ -93,8 +89,8 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
// java.lang.IllegalStateException: unexpected message type: LastHttpContent"
// exception will be thrown
secondResponse <- makeRequest(client, port)
secondResponseBody <- secondResponse.body.asStream.chunks.map(chunk => new String(chunk.toArray)).runCollect
firstResponseBody <- firstResponseBodyReceive.join
secondResponseBody <- secondResponse.body.asStream.chunks.map(_.asString).runCollect
firstResponseBody <- firstResponseBodyReceive.join
value =
firstResponse.status == Status.Ok &&
// since response has not chunked transfer encoding header we can't guarantee that
Expand Down

0 comments on commit c20d756

Please sign in to comment.