Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stream delay 3 #2458

Merged
merged 2 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import io.netty.channel._
import io.netty.handler.codec.http.{DefaultHttpContent, LastHttpContent}
object NettyBodyWriter {

def writeAndFlush(body: Body, ctx: ChannelHandlerContext)(implicit trace: Trace): Option[Task[Unit]] =
def writeAndFlush(body: Body, contentLength: Option[Long], ctx: ChannelHandlerContext)(implicit
trace: Trace,
): Option[Task[Unit]] =
body match {
case body: ByteBufBody =>
ctx.write(body.byteBuf)
Expand Down Expand Up @@ -66,14 +68,44 @@ object NettyBodyWriter {
None
case StreamBody(stream, _, _) =>
Some(
stream.chunks.mapZIO { bytes =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}
}.runDrain.zipRight {
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
contentLength match {
case Some(length) =>
stream.chunks
.runFoldZIO(length) { (remaining, bytes) =>
remaining - bytes.size match {
case 0L =>
NettyFutureExecutor.executed {
// Flushes the last body content and LastHttpContent together to avoid race conditions.
ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}.as(0L)

case n =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}.as(n)
}
}
.flatMap {
case 0L => ZIO.unit
case remaining =>
val actualLength = length - remaining
ZIO.logWarning(s"Expected Content-Length of $length, but sent $actualLength bytes") *>
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
}

case None =>
stream.chunks.mapZIO { bytes =>
NettyFutureExecutor.executed {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
}
}.runDrain.zipRight {
NettyFutureExecutor.executed {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
}
},
)
case ChunkBody(data, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ private[zio] object NettyResponseEncoder {
fastEncode(response, bytes)
} else {
val jHeaders = Conversions.headersToNetty(response.headers)
// Prevent client from closing connection before server writes EMPTY_LAST_CONTENT.
if (response.body.isInstanceOf[Body.StreamBody]) {
jHeaders.remove(HttpHeaderNames.CONTENT_LENGTH)
}
val jStatus = Conversions.statusToNetty(response.status)
val hasContentLength = jHeaders.contains(HttpHeaderNames.CONTENT_LENGTH)
if (!hasContentLength) jHeaders.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final class ClientInboundHandler(
ctx.writeAndFlush(fullRequest)
case _: HttpRequest =>
ctx.write(jReq)
NettyBodyWriter.writeAndFlush(req.body, ctx).foreach { effect =>
NettyBodyWriter.writeAndFlush(req.body, None, ctx).foreach { effect =>
rtm.run(ctx, NettyRuntime.noopEnsuring)(effect)(Unsafe.unsafe, trace)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,13 @@ private[zio] final case class ServerInboundHandler(
val jResponse = NettyResponseEncoder.encode(ctx, response, runtime)
// setServerTime(time, response, jResponse)
ctx.writeAndFlush(jResponse)
if (!jResponse.isInstanceOf[FullHttpResponse])
NettyBodyWriter.writeAndFlush(response.body, ctx)
else
if (!jResponse.isInstanceOf[FullHttpResponse]) {
val contentLength = jResponse.headers.get(HttpHeaderNames.CONTENT_LENGTH) match {
case null => None
case value => Some(value.toLong)
}
NettyBodyWriter.writeAndFlush(response.body, contentLength, ctx)
} else
None
}
}
Expand Down
4 changes: 4 additions & 0 deletions zio-http/src/test/scala/zio/http/StaticFileServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ object StaticFileServerSpec extends HttpRunnableSpec {
val res = resourceOk.run().map(_.status)
assertZIO(res)(equalTo(Status.Ok))
},
test("should have content-length") {
val res = resourceOk.run().map(_.header(Header.ContentLength))
assertZIO(res)(isSome(equalTo(Header.ContentLength(7L))))
},
test("should have content") {
val res = resourceOk.run().flatMap(_.body.asString)
assertZIO(res)(equalTo("foo\nbar"))
Expand Down
37 changes: 16 additions & 21 deletions zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package zio.http.netty

import zio._
import zio.test.Assertion._
import zio.test.TestAspect.withLiveClock
import zio.test.{Spec, TestEnvironment, assert}
import zio.test.{Spec, TestEnvironment, assertTrue}

import zio.stream.{ZStream, ZStreamAspect}

Expand All @@ -20,7 +19,8 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
handler(
http.Response(
status = Status.Ok,
// Content-Length header will be removed when the body is a stream
// content length header is important,
// in this case the server will not use chunked transfer encoding even if response is a stream
headers = Headers(Header.ContentLength(len)),
body = Body.fromStream(streams.next()),
),
Expand Down Expand Up @@ -77,7 +77,7 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
client <- ZIO.service[Client]
firstResponse <- makeRequest(client, port)
firstResponseBodyReceive <- firstResponse.body.asStream.chunks.mapZIO { chunk =>
atLeastOneChunkReceived.succeed(()) *> ZIO.succeed(chunk.asString)
atLeastOneChunkReceived.succeed(()).as(chunk.asString)
}.runCollect.fork
_ <- firstResponseQueue.offerAll(message.getBytes.toList)
_ <- atLeastOneChunkReceived.await
Expand All @@ -91,23 +91,18 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
secondResponse <- makeRequest(client, port)
secondResponseBody <- secondResponse.body.asStream.chunks.map(_.asString).runCollect
firstResponseBody <- firstResponseBodyReceive.join

assertFirst =
assert(firstResponse.status)(equalTo(Status.Ok)) &&
assert(firstResponse.headers.get(Header.ContentLength))(isNone) &&
assert(firstResponse.headers.get(Header.TransferEncoding))(
isSome(equalTo(Header.TransferEncoding.Chunked)),
) &&
assert(firstResponseBody.reduce(_ + _))(equalTo(message))

assertSecond =
assert(secondResponse.status)(equalTo(Status.Ok)) &&
assert(secondResponse.headers.get(Header.ContentLength))(isNone) &&
assert(secondResponse.headers.get(Header.TransferEncoding))(
isSome(equalTo(Header.TransferEncoding.Chunked)),
) &&
assert(secondResponseBody)(equalTo(Chunk(message, "")))
} yield assertFirst && assertSecond
value =
firstResponse.status == Status.Ok &&
// since response has not chunked transfer encoding header we can't guarantee that
// received chunks will be the same as it was transferred. So we need to check the whole body
firstResponseBody.reduce(_ + _) == message &&
secondResponse.status == Status.Ok &&
secondResponseBody == Chunk(message)
} yield {
assertTrue(
value,
)
}
},
).provide(
singleConnectionClient,
Expand Down