From ab1fc3dc839a2dfd5920adc4d6ceb16d5782fe34 Mon Sep 17 00:00:00 2001 From: Jules Ivanic Date: Thu, 5 Dec 2024 23:38:57 +1100 Subject: [PATCH] Further optimise `zio.http.Body.FileBody.asStream` code Following https://github.com/zio/zio-http/pull/3215 --- .../shared/src/main/scala/zio/http/Body.scala | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/zio-http/shared/src/main/scala/zio/http/Body.scala b/zio-http/shared/src/main/scala/zio/http/Body.scala index 00ba269aab..0ccfe858d5 100644 --- a/zio-http/shared/src/main/scala/zio/http/Body.scala +++ b/zio-http/shared/src/main/scala/zio/http/Body.scala @@ -462,7 +462,8 @@ object Body { override def asChunk(implicit trace: Trace): Task[Chunk[Byte]] = zioEmptyChunk override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] = ZStream.empty - override def isComplete: Boolean = true + + override def isComplete: Boolean = true override def isEmpty: Boolean = true @@ -471,7 +472,8 @@ object Body { override private[zio] def unsafeAsArray(implicit unsafe: Unsafe): Array[Byte] = Array.empty[Byte] override def contentType(newContentType: Body.ContentType): Body = this - override def contentType: Option[Body.ContentType] = None + + override def contentType: Option[Body.ContentType] = None override def knownContentLength: Option[Long] = Some(0L) } @@ -568,27 +570,35 @@ object Body { override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] = ZStream.unwrap { ZIO.blocking { - for { - r <- ZIO.attempt { + ZIO.suspendSucceed { + try { val fs = new FileInputStream(file) val size = Math.min(chunkSize.toLong, file.length()).toInt - (fs, size) - } - (fs, size) = r - } yield ZStream - .repeatZIOOption[Any, Throwable, Chunk[Byte]] { - for { - buffer <- ZIO.succeed(new Array[Byte](size)) - len <- ZIO.attempt(fs.read(buffer)).mapError(Some(_)) - bytes <- - if (len > 0) ZIO.succeed(Chunk.fromArray(buffer.slice(0, len))) - else ZIO.fail(None) - } yield bytes + val read: Task[Option[Chunk[Byte]]] = + ZIO.suspendSucceed { + try { + val buffer = new Array[Byte](size) + val len = fs.read(buffer) + if (len > 0) Exit.succeed(Some(Chunk.fromArray(buffer.slice(0, len)))) + else Exit.none + } catch { + case e: Throwable => Exit.fail(e) + } + } + + Exit.succeed { + // Optimised for our needs version of `ZIO.repeatZIOChunkOption` + ZStream + .unfoldChunkZIO(read)(_.map(_.map(_ -> read))) + .ensuring(ZIO.attempt(fs.close()).ignoreLogged) + } + } catch { + case e: Throwable => Exit.fail(e) } - .ensuring(ZIO.attempt(fs.close()).ignoreLogged) + } } - }.flattenChunks + } override def contentType(newContentType: Body.ContentType): Body = copy(contentType = Some(newContentType))