From e195aa439b63079b7263f91a644d6c01b5db4425 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Fri, 13 Oct 2023 01:34:41 -0700 Subject: [PATCH] Increase Buffer Size In Converting Async Body To Stream (#2478) * enable test * enable test * revert * fix test * try to enable some more tests * revert --- .../main/scala/zio/http/netty/NettyBody.scala | 30 ++++++++++--------- .../zio/http/endpoint/RoundtripSpec.scala | 4 +-- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala index efa698ce52..cac6af0a5c 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala @@ -123,21 +123,23 @@ object NettyBody extends BodyEncoding { override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] = ZStream - .async[Any, Throwable, Byte](emit => - try { - unsafeAsync(new UnsafeAsync { - override def apply(message: Chunk[Byte], isLast: Boolean): Unit = { - emit(ZIO.succeed(message)) - if (isLast) { - emit(ZIO.fail(None)) + .async[Any, Throwable, Byte]( + emit => + try { + unsafeAsync(new UnsafeAsync { + override def apply(message: Chunk[Byte], isLast: Boolean): Unit = { + emit(ZIO.succeed(message)) + if (isLast) { + emit(ZIO.fail(None)) + } } - } - override def fail(cause: Throwable): Unit = - emit(ZIO.fail(Some(cause))) - }) - } catch { - case e: Throwable => emit(ZIO.fail(Option(e))) - }, + override def fail(cause: Throwable): Unit = + emit(ZIO.fail(Some(cause))) + }) + } catch { + case e: Throwable => emit(ZIO.fail(Option(e))) + }, + 4096, ) override def isComplete: Boolean = false diff --git a/zio-http/src/test/scala/zio/http/endpoint/RoundtripSpec.scala b/zio-http/src/test/scala/zio/http/endpoint/RoundtripSpec.scala index 6172b35a3b..eff5ce37ab 100644 --- a/zio-http/src/test/scala/zio/http/endpoint/RoundtripSpec.scala +++ b/zio-http/src/test/scala/zio/http/endpoint/RoundtripSpec.scala @@ -291,7 +291,7 @@ object RoundtripSpec extends ZIOHttpSpec { ("name", 10, Post(1, "title", "body", 111)), "name: name, value: 10, post: Post(1,title,body,111)", ) - } @@ ifEnvNotSet("CI"), + }, test("endpoint error returned") { val api = Endpoint(POST / "test") .outError[String](Status.Custom(999)) @@ -460,7 +460,7 @@ object RoundtripSpec extends ZIOHttpSpec { s"name: xyz, value: 100, count: ${1024 * 1024}", ) } - } @@ ifEnvNotSet("CI"), + }, ).provide( Server.live, ZLayer.succeed(Server.Config.default.onAnyOpenPort.enableRequestStreaming),