Skip to content

Commit

Permalink
Further optimise zio.http.Body.FileBody.asStream code
Browse files Browse the repository at this point in the history
Following #3215
  • Loading branch information
guizmaii committed Dec 5, 2024
1 parent 1f8ef1f commit ab1fc3d
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions zio-http/shared/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ object Body {
override def asChunk(implicit trace: Trace): Task[Chunk[Byte]] = zioEmptyChunk

override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] = ZStream.empty
override def isComplete: Boolean = true

override def isComplete: Boolean = true

override def isEmpty: Boolean = true

Expand All @@ -471,7 +472,8 @@ object Body {
override private[zio] def unsafeAsArray(implicit unsafe: Unsafe): Array[Byte] = Array.empty[Byte]

override def contentType(newContentType: Body.ContentType): Body = this
override def contentType: Option[Body.ContentType] = None

override def contentType: Option[Body.ContentType] = None

override def knownContentLength: Option[Long] = Some(0L)
}
Expand Down Expand Up @@ -568,27 +570,35 @@ object Body {
override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] =
ZStream.unwrap {
ZIO.blocking {
for {
r <- ZIO.attempt {
ZIO.suspendSucceed {
try {
val fs = new FileInputStream(file)
val size = Math.min(chunkSize.toLong, file.length()).toInt

(fs, size)
}
(fs, size) = r
} yield ZStream
.repeatZIOOption[Any, Throwable, Chunk[Byte]] {
for {
buffer <- ZIO.succeed(new Array[Byte](size))
len <- ZIO.attempt(fs.read(buffer)).mapError(Some(_))
bytes <-
if (len > 0) ZIO.succeed(Chunk.fromArray(buffer.slice(0, len)))
else ZIO.fail(None)
} yield bytes
val read: Task[Option[Chunk[Byte]]] =
ZIO.suspendSucceed {
try {
val buffer = new Array[Byte](size)
val len = fs.read(buffer)
if (len > 0) Exit.succeed(Some(Chunk.fromArray(buffer.slice(0, len))))
else Exit.none
} catch {
case e: Throwable => Exit.fail(e)
}
}

Exit.succeed {
// Optimised for our needs version of `ZIO.repeatZIOChunkOption`
ZStream
.unfoldChunkZIO(read)(_.map(_.map(_ -> read)))
.ensuring(ZIO.attempt(fs.close()).ignoreLogged)
}
} catch {
case e: Throwable => Exit.fail(e)
}
.ensuring(ZIO.attempt(fs.close()).ignoreLogged)
}
}
}.flattenChunks
}

override def contentType(newContentType: Body.ContentType): Body = copy(contentType = Some(newContentType))

Expand Down

2 comments on commit ab1fc3d

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 : Performance Benchmarks (SimpleEffectBenchmarkServer)

concurrency: 256
requests/sec: 347399

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 : Performance Benchmarks (PlainTextBenchmarkServer)

concurrency: 256
requests/sec: 352338

Please sign in to comment.