Skip to content

Commit

Permalink
Fix stream delay 3 (#2458)
Browse files Browse the repository at this point in the history
* Revert "Fix stream body delay 2 (#2420)"

This reverts commit 70aabfc.

* Proper fix for #2284
  • Loading branch information
guersam authored Sep 24, 2023
1 parent 07df316 commit 04e702c
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 38 deletions.
50 changes: 41 additions & 9 deletions zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import io.netty.channel._
import io.netty.handler.codec.http.{DefaultHttpContent, LastHttpContent}
object NettyBodyWriter {

def writeAndFlush(body: Body, ctx: ChannelHandlerContext)(implicit trace: Trace): Option[Task[Unit]] =
def writeAndFlush(body: Body, contentLength: Option[Long], ctx: ChannelHandlerContext)(implicit
trace: Trace,
): Option[Task[Unit]] =
body match {
case body: ByteBufBody =>
ctx.write(body.byteBuf)
Expand Down Expand Up @@ -66,14 +68,44 @@ object NettyBodyWriter {
None
case StreamBody(stream, _, _) =>
Some(
stream.chunks.mapZIO { bytes =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}
}.runDrain.zipRight {
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
contentLength match {
case Some(length) =>
stream.chunks
.runFoldZIO(length) { (remaining, bytes) =>
remaining - bytes.size match {
case 0L =>
NettyFutureExecutor.executed {
// Flushes the last body content and LastHttpContent together to avoid race conditions.
ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}.as(0L)

case n =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}.as(n)
}
}
.flatMap {
case 0L => ZIO.unit
case remaining =>
val actualLength = length - remaining
ZIO.logWarning(s"Expected Content-Length of $length, but sent $actualLength bytes") *>
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
}

case None =>
stream.chunks.mapZIO { bytes =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}
}.runDrain.zipRight {
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
}
},
)
case ChunkBody(data, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ private[zio] object NettyResponseEncoder {
fastEncode(response, bytes)
} else {
val jHeaders = Conversions.headersToNetty(response.headers)
// Prevent client from closing connection before server writes EMPTY_LAST_CONTENT.
if (response.body.isInstanceOf[Body.StreamBody]) {
jHeaders.remove(HttpHeaderNames.CONTENT_LENGTH)
}
val jStatus = Conversions.statusToNetty(response.status)
val hasContentLength = jHeaders.contains(HttpHeaderNames.CONTENT_LENGTH)
if (!hasContentLength) jHeaders.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final class ClientInboundHandler(
ctx.writeAndFlush(fullRequest)
case _: HttpRequest =>
ctx.write(jReq)
NettyBodyWriter.writeAndFlush(req.body, ctx).foreach { effect =>
NettyBodyWriter.writeAndFlush(req.body, None, ctx).foreach { effect =>
rtm.run(ctx, NettyRuntime.noopEnsuring)(effect)(Unsafe.unsafe, trace)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,13 @@ private[zio] final case class ServerInboundHandler(
val jResponse = NettyResponseEncoder.encode(ctx, response, runtime)
// setServerTime(time, response, jResponse)
ctx.writeAndFlush(jResponse)
if (!jResponse.isInstanceOf[FullHttpResponse])
NettyBodyWriter.writeAndFlush(response.body, ctx)
else
if (!jResponse.isInstanceOf[FullHttpResponse]) {
val contentLength = jResponse.headers.get(HttpHeaderNames.CONTENT_LENGTH) match {
case null => None
case value => Some(value.toLong)
}
NettyBodyWriter.writeAndFlush(response.body, contentLength, ctx)
} else
None
}
}
Expand Down
4 changes: 4 additions & 0 deletions zio-http/src/test/scala/zio/http/StaticFileServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ object StaticFileServerSpec extends HttpRunnableSpec {
val res = resourceOk.run().map(_.status)
assertZIO(res)(equalTo(Status.Ok))
},
test("should have content-length") {
val res = resourceOk.run().map(_.header(Header.ContentLength))
assertZIO(res)(isSome(equalTo(Header.ContentLength(7L))))
},
test("should have content") {
val res = resourceOk.run().flatMap(_.body.asString)
assertZIO(res)(equalTo("foo\nbar"))
Expand Down
37 changes: 16 additions & 21 deletions zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package zio.http.netty

import zio._
import zio.test.Assertion._
import zio.test.TestAspect.withLiveClock
import zio.test.{Spec, TestEnvironment, assert}
import zio.test.{Spec, TestEnvironment, assertTrue}

import zio.stream.{ZStream, ZStreamAspect}

Expand All @@ -20,7 +19,8 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
handler(
http.Response(
status = Status.Ok,
// Content-Length header will be removed when the body is a stream
// content length header is important,
// in this case the server will not use chunked transfer encoding even if response is a stream
headers = Headers(Header.ContentLength(len)),
body = Body.fromStream(streams.next()),
),
Expand Down Expand Up @@ -77,7 +77,7 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
client <- ZIO.service[Client]
firstResponse <- makeRequest(client, port)
firstResponseBodyReceive <- firstResponse.body.asStream.chunks.mapZIO { chunk =>
atLeastOneChunkReceived.succeed(()) *> ZIO.succeed(chunk.asString)
atLeastOneChunkReceived.succeed(()).as(chunk.asString)
}.runCollect.fork
_ <- firstResponseQueue.offerAll(message.getBytes.toList)
_ <- atLeastOneChunkReceived.await
Expand All @@ -91,23 +91,18 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
secondResponse <- makeRequest(client, port)
secondResponseBody <- secondResponse.body.asStream.chunks.map(_.asString).runCollect
firstResponseBody <- firstResponseBodyReceive.join

assertFirst =
assert(firstResponse.status)(equalTo(Status.Ok)) &&
assert(firstResponse.headers.get(Header.ContentLength))(isNone) &&
assert(firstResponse.headers.get(Header.TransferEncoding))(
isSome(equalTo(Header.TransferEncoding.Chunked)),
) &&
assert(firstResponseBody.reduce(_ + _))(equalTo(message))

assertSecond =
assert(secondResponse.status)(equalTo(Status.Ok)) &&
assert(secondResponse.headers.get(Header.ContentLength))(isNone) &&
assert(secondResponse.headers.get(Header.TransferEncoding))(
isSome(equalTo(Header.TransferEncoding.Chunked)),
) &&
assert(secondResponseBody)(equalTo(Chunk(message, "")))
} yield assertFirst && assertSecond
value =
firstResponse.status == Status.Ok &&
// since response has not chunked transfer encoding header we can't guarantee that
// received chunks will be the same as it was transferred. So we need to check the whole body
firstResponseBody.reduce(_ + _) == message &&
secondResponse.status == Status.Ok &&
secondResponseBody == Chunk(message)
} yield {
assertTrue(
value,
)
}
},
).provide(
singleConnectionClient,
Expand Down

0 comments on commit 04e702c

Please sign in to comment.