diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala b/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala index b6e34896b2..1fd1b23501 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala @@ -29,7 +29,9 @@ import io.netty.channel._ import io.netty.handler.codec.http.{DefaultHttpContent, LastHttpContent} object NettyBodyWriter { - def writeAndFlush(body: Body, ctx: ChannelHandlerContext)(implicit trace: Trace): Option[Task[Unit]] = + def writeAndFlush(body: Body, contentLength: Option[Long], ctx: ChannelHandlerContext)(implicit + trace: Trace, + ): Option[Task[Unit]] = body match { case body: ByteBufBody => ctx.write(body.byteBuf) @@ -66,27 +68,45 @@ object NettyBodyWriter { None case StreamBody(stream, _, _) => Some( - stream.chunks - .runFoldZIO(Option.empty[Chunk[Byte]]) { - case (Some(previous), current) => + contentLength match { + case Some(length) => + stream.chunks + .runFoldZIO(length) { (remaining, bytes) => + remaining - bytes.size match { + case 0L => + NettyFutureExecutor.executed { + // Flushes the last body content and LastHttpContent together to avoid race conditions. + ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray))) + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + }.as(0L) + + case n => + NettyFutureExecutor.executed { + ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray))) + }.as(n) + } + } + .flatMap { + case 0L => ZIO.unit + case remaining => + val actualLength = length - remaining + ZIO.logWarning(s"Expected Content-Length of $length, but sent $actualLength bytes") *> + NettyFutureExecutor.executed { + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + } + } + + case None => + stream.chunks.mapZIO { bytes => NettyFutureExecutor.executed { - ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(previous.toArray))) - } *> - ZIO.succeed(Some(current)) - case (_, current) => - ZIO.succeed(Some(current)) - } - .flatMap { maybeLastChunk => - // last chunk is handled separately to avoid fiber interrupt before EMPTY_LAST_CONTENT is sent - ZIO.attempt( - maybeLastChunk.foreach { lastChunk => - ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(lastChunk.toArray))) - }, - ) *> + ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray))) + } + }.runDrain.zipRight { NettyFutureExecutor.executed { ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) } - }, + } + }, ) case ChunkBody(data, _, _) => ctx.write(Unpooled.wrappedBuffer(data.toArray)) diff --git a/zio-http/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala b/zio-http/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala index 4f57b773c1..a4ee256aa9 100644 --- a/zio-http/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala +++ b/zio-http/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala @@ -55,7 +55,7 @@ final class ClientInboundHandler( ctx.writeAndFlush(fullRequest) case _: HttpRequest => ctx.write(jReq) - NettyBodyWriter.writeAndFlush(req.body, ctx).foreach { effect => + NettyBodyWriter.writeAndFlush(req.body, None, ctx).foreach { effect => rtm.run(ctx, NettyRuntime.noopEnsuring)(effect)(Unsafe.unsafe, trace) } } diff --git a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala index 5df9e87a94..f44c549d6a 100644 --- a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala +++ b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala @@ -172,9 +172,13 @@ private[zio] final case class ServerInboundHandler( val jResponse = NettyResponseEncoder.encode(ctx, response, runtime) // setServerTime(time, response, jResponse) ctx.writeAndFlush(jResponse) - if (!jResponse.isInstanceOf[FullHttpResponse]) - NettyBodyWriter.writeAndFlush(response.body, ctx) - else + if (!jResponse.isInstanceOf[FullHttpResponse]) { + val contentLength = jResponse.headers.get(HttpHeaderNames.CONTENT_LENGTH) match { + case null => None + case value => Some(value.toLong) + } + NettyBodyWriter.writeAndFlush(response.body, contentLength, ctx) + } else None } } diff --git a/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala b/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala index fc53412848..11a93802d2 100644 --- a/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala +++ b/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala @@ -76,13 +76,9 @@ object NettyStreamBodySpec extends HttpRunnableSpec { ) client <- ZIO.service[Client] firstResponse <- makeRequest(client, port) - firstResponseBodyReceive <- firstResponse.body.asStream.chunks - .map(chunk => new String(chunk.toArray)) - .mapZIO { chunk => - atLeastOneChunkReceived.succeed(()) *> ZIO.succeed(chunk) - } - .runCollect - .fork + firstResponseBodyReceive <- firstResponse.body.asStream.chunks.mapZIO { chunk => + atLeastOneChunkReceived.succeed(()).as(chunk.asString) + }.runCollect.fork _ <- firstResponseQueue.offerAll(message.getBytes.toList) _ <- atLeastOneChunkReceived.await // saying that there will be no more data in the first response stream @@ -93,8 +89,8 @@ object NettyStreamBodySpec extends HttpRunnableSpec { // java.lang.IllegalStateException: unexpected message type: LastHttpContent" // exception will be thrown secondResponse <- makeRequest(client, port) - secondResponseBody <- secondResponse.body.asStream.chunks.map(chunk => new String(chunk.toArray)).runCollect - firstResponseBody <- firstResponseBodyReceive.join + secondResponseBody <- secondResponse.body.asStream.chunks.map(_.asString).runCollect + firstResponseBody <- firstResponseBodyReceive.join value = firstResponse.status == Status.Ok && // since response has not chunked transfer encoding header we can't guarantee that