-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Further optimise zio.http.Body.FileBody.asStream
code
#3234
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -462,7 +462,8 @@ object Body { | |
override def asChunk(implicit trace: Trace): Task[Chunk[Byte]] = zioEmptyChunk | ||
|
||
override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] = ZStream.empty | ||
override def isComplete: Boolean = true | ||
|
||
override def isComplete: Boolean = true | ||
|
||
override def isEmpty: Boolean = true | ||
|
||
|
@@ -471,7 +472,8 @@ object Body { | |
override private[zio] def unsafeAsArray(implicit unsafe: Unsafe): Array[Byte] = Array.empty[Byte] | ||
|
||
override def contentType(newContentType: Body.ContentType): Body = this | ||
override def contentType: Option[Body.ContentType] = None | ||
|
||
override def contentType: Option[Body.ContentType] = None | ||
|
||
override def knownContentLength: Option[Long] = Some(0L) | ||
} | ||
|
@@ -568,27 +570,35 @@ object Body { | |
override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] = | ||
ZStream.unwrap { | ||
ZIO.blocking { | ||
for { | ||
r <- ZIO.attempt { | ||
ZIO.suspendSucceed { | ||
try { | ||
val fs = new FileInputStream(file) | ||
val size = Math.min(chunkSize.toLong, file.length()).toInt | ||
|
||
(fs, size) | ||
} | ||
(fs, size) = r | ||
} yield ZStream | ||
.repeatZIOOption[Any, Throwable, Chunk[Byte]] { | ||
for { | ||
buffer <- ZIO.succeed(new Array[Byte](size)) | ||
len <- ZIO.attempt(fs.read(buffer)).mapError(Some(_)) | ||
bytes <- | ||
if (len > 0) ZIO.succeed(Chunk.fromArray(buffer.slice(0, len))) | ||
else ZIO.fail(None) | ||
} yield bytes | ||
val read: Task[Option[Chunk[Byte]]] = | ||
ZIO.suspendSucceed { | ||
try { | ||
val buffer = new Array[Byte](size) | ||
val len = fs.read(buffer) | ||
if (len > 0) Exit.succeed(Some(Chunk.fromArray(buffer.slice(0, len)))) | ||
else Exit.none | ||
} catch { | ||
case e: Throwable => Exit.fail(e) | ||
} | ||
} | ||
|
||
Exit.succeed { | ||
// Optimised for our needs version of `ZIO.repeatZIOChunkOption` | ||
ZStream | ||
.unfoldChunkZIO(read)(_.map(_.map(_ -> read))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kyri-petrou Funnily enough, if we were managing to find a way to write a recursive type in Scala,
type Read = Task[Option[(Chunk[Byte], Read)]]
val read: Read =
ZIO.suspendSucceed {
try {
val buffer = new Array[Byte](size)
val len = fs.read(buffer)
if (len > 0) Exit.succeed(Some(Chunk.fromArray(buffer.slice(0, len) -> read)))
else Exit.none
} catch {
case e: Throwable => Exit.fail(e)
}
}
Exit.succeed {
ZStream
.unfoldChunkZIO(read)(ZIO.identityFn)
.ensuring(ZIO.attempt(fs.close()).ignoreLogged)
} 😍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can imagine, that there is a nice way to do this for Scala 3. But I think we are unable to find a nice way for Scala 2 |
||
.ensuring(ZIO.attempt(fs.close()).ignoreLogged) | ||
} | ||
} catch { | ||
case e: Throwable => Exit.fail(e) | ||
} | ||
.ensuring(ZIO.attempt(fs.close()).ignoreLogged) | ||
} | ||
} | ||
}.flattenChunks | ||
} | ||
|
||
override def contentType(newContentType: Body.ContentType): Body = copy(contentType = Some(newContentType)) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replacing
ZStream.repeatZIOOption
withZStream.repeatZIOChunkOption
allows us to remove the.flattenChunks
Anyway,
repeatZIOOption
is usingrepeatZIOChunkOption
underneath: