Skip to content

Commit

Permalink
Content-Length, chunked transfer encoding and Host header improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Dec 22, 2023
1 parent f4b68d2 commit 48b74b5
Show file tree
Hide file tree
Showing 24 changed files with 223 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[cli] object Retriever {

override def retrieve(): Task[FormField] =
for {
chunk <- Body.fromFile(new java.io.File(path.toUri())).asChunk
chunk <- Body.fromFile(new java.io.File(path.toUri())).flatMap(_.asChunk)
} yield FormField.binaryField(name, chunk, media)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object FileStreaming extends ZIOAppDefault {

// Read the file as ZStream
// Uses the blocking version of ZStream.fromFile
Method.GET / "blocking" -> Handler.fromStream(ZStream.fromPath(Paths.get("README.md"))),
Method.GET / "blocking" -> Handler.fromStreamChunked(ZStream.fromPath(Paths.get("README.md"))),

// Uses netty's capability to write file content to the Channel
// Content-type response headers are automatically identified and added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object RequestStreaming extends ZIOAppDefault {

// Creating HttpData from the stream
// This works for file of any size
val data = Body.fromStream(stream)
val data = Body.fromStreamChunked(stream)

Response(body = data)
}).toHttpApp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ object StreamingResponse extends ZIOAppDefault {
handler(
http.Response(
status = Status.Ok,
headers = Headers(Header.ContentLength(message.length.toLong)),
body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
),
),
).toHttpApp
Expand Down
63 changes: 51 additions & 12 deletions zio-http/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package zio.http

import java.io.FileInputStream
import java.io.{FileInputStream, IOException}
import java.nio.charset._
import java.nio.file._

Expand Down Expand Up @@ -120,6 +120,11 @@ trait Body { self =>
*/
def isComplete: Boolean

/**
* Returns whether or not the content length is known
*/
def knownContentLength: Option[Long]

/**
* Returns whether or not the body is known to be empty. Note that some bodies
* may not be known to be empty until an attempt is made to consume them.
Expand Down Expand Up @@ -167,8 +172,10 @@ object Body {
/**
* Constructs a [[zio.http.Body]] from the contents of a file.
*/
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4): Body =
FileBody(file, chunkSize)
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4)(implicit trace: Trace): ZIO[Any, Nothing, Body] =
ZIO.succeed(file.length()).map { fileSize =>
FileBody(file, chunkSize, fileSize)
}

/**
* Constructs a [[zio.http.Body]] from from form data, using multipart
Expand All @@ -180,7 +187,7 @@ object Body {
)(implicit trace: Trace): Body = {
val bytes = form.multipartBytes(specificBoundary)

StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
}

/**
Expand All @@ -192,26 +199,48 @@ object Body {
form: Form,
)(implicit trace: Trace): UIO[Body] =
form.multipartBytesUUID.map { case (boundary, bytes) =>
StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary))
StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(boundary))
}

/**
* Constructs a [[zio.http.Body]] from a stream of bytes.
* Constructs a [[zio.http.Body]] from a stream of bytes with a known length.
*/
def fromStream(stream: ZStream[Any, Throwable, Byte], contentLength: Long): Body =
StreamBody(stream, knownContentLength = Some(contentLength))

/**
* Constructs a [[zio.http.Body]] from a stream of bytes of unknown length,
* using chunked transfer encoding.
*/
def fromStream(stream: ZStream[Any, Throwable, Byte]): Body =
StreamBody(stream)
def fromStreamChunked(stream: ZStream[Any, Throwable, Byte]): Body =
StreamBody(stream, knownContentLength = None)

/**
* Constructs a [[zio.http.Body]] from a stream of text, using the specified
* character set, which defaults to the HTTP character set.
* Constructs a [[zio.http.Body]] from a stream of text with known length,
* using the specified character set, which defaults to the HTTP character
* set.
*/
def fromCharSequenceStream(
stream: ZStream[Any, Throwable, CharSequence],
contentLength: Long,
charset: Charset = Charsets.Http,
)(implicit
trace: Trace,
): Body =
fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks)
fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks, contentLength)

/**
* Constructs a [[zio.http.Body]] from a stream of text with unknown length
* using chunked transfer encoding, using the specified character set, which
* defaults to the HTTP character set.
*/
def fromCharSequenceStreamChunked(
stream: ZStream[Any, Throwable, CharSequence],
charset: Charset = Charsets.Http,
)(implicit
trace: Trace,
): Body =
fromStreamChunked(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks)

/**
* Helper to create Body from String
Expand Down Expand Up @@ -262,6 +291,8 @@ object Body {
override def contentType(newMediaType: MediaType): Body = EmptyBody

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = EmptyBody

override def knownContentLength: Option[Long] = Some(0L)
}

private[zio] final case class ChunkBody(
Expand Down Expand Up @@ -291,11 +322,14 @@ object Body {

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))

override def knownContentLength: Option[Long] = Some(data.length.toLong)
}

private[zio] final case class FileBody(
val file: java.io.File,
file: java.io.File,
chunkSize: Int = 1024 * 4,
fileSize: Long,
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body
Expand Down Expand Up @@ -339,10 +373,13 @@ object Body {

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))

override def knownContentLength: Option[Long] = Some(fileSize)
}

private[zio] final case class StreamBody(
stream: ZStream[Any, Throwable, Byte],
knownContentLength: Option[Long],
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body {
Expand Down Expand Up @@ -385,6 +422,8 @@ object Body {

def contentType(newMediaType: zio.http.MediaType, newBoundary: zio.http.Boundary): zio.http.Body = this

override def knownContentLength: Option[Long] = Some(0L)

}

private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])(Trace.empty)
Expand Down
64 changes: 45 additions & 19 deletions zio-http/src/main/scala/zio/http/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -832,17 +832,18 @@ object Handler {
ZIO.fail(new AccessDeniedException(file.getAbsolutePath))
} else {
if (file.isFile) {
val length = Headers(Header.ContentLength(file.length()))
val response = http.Response(headers = length, body = Body.fromFile(file))
val pathName = file.toPath.toString

// Set MIME type in the response headers. This is only relevant in
// case of RandomAccessFile transfers as browsers use the MIME type,
// not the file extension, to determine how to process a URL.
// {{{<a href="MSDN Doc">https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type</a>}}}
determineMediaType(pathName) match {
case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType)))
case None => ZIO.succeed(response)
Body.fromFile(file).flatMap { body =>
val response = http.Response(body = body)
val pathName = file.toPath.toString

// Set MIME type in the response headers. This is only relevant in
// case of RandomAccessFile transfers as browsers use the MIME type,
// not the file extension, to determine how to process a URL.
// {{{<a href="MSDN Doc">https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type</a>}}}
determineMediaType(pathName) match {
case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType)))
case None => ZIO.succeed(response)
}
}
} else {
ZIO.fail(new NotDirectoryException(s"Found directory instead of a file."))
Expand Down Expand Up @@ -897,11 +898,10 @@ object Handler {
.acquireReleaseWith(openZip)(closeZip)
.mapZIO(jar => ZIO.attemptBlocking(jar.getEntry(resourcePath) -> jar))
.flatMap { case (entry, jar) => ZStream.fromInputStream(jar.getInputStream(entry)) }
response = Response(body = Body.fromStream(inZStream))
response = Response(body = Body.fromStream(inZStream, contentLength))
} yield mediaType.fold(response) { t =>
response
.addHeader(Header.ContentType(t))
.addHeader(Header.ContentLength(contentLength))
}
}
}
Expand All @@ -913,27 +913,53 @@ object Handler {

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body
* provided ZStream with a known content length as the body
*/
def fromStream[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit
def fromStream[R](stream: ZStream[R, Throwable, String], contentLength: Long, charset: Charset = Charsets.Http)(
implicit trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), contentLength, charset))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream with a known content length as the body
*/
def fromStream[R](stream: ZStream[R, Throwable, Byte], contentLength: Long)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromStream(stream.provideEnvironment(env), contentLength))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body using chunked transfer encoding
*/
def fromStreamChunked[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), charset))
fromBody(Body.fromCharSequenceStreamChunked(stream.provideEnvironment(env), charset))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body
* provided ZStream as the body using chunked transfer encoding
*/
def fromStream[R](stream: ZStream[R, Throwable, Byte])(implicit
def fromStreamChunked[R](stream: ZStream[R, Throwable, Byte])(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromStream(stream.provideEnvironment(env)))
fromBody(Body.fromStreamChunked(stream.provideEnvironment(env)))
}
}.flatten

Expand Down
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zio/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object Response {
* \- stream of data to be sent as Server Sent Events
*/
def fromServerSentEvents(data: ZStream[Any, Nothing, ServerSentEvent])(implicit trace: Trace): Response =
Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStream(data.map(_.encode)))
Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStreamChunked(data.map(_.encode)))

/**
* Creates a new response for the provided socket app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ private[http] object BodyCodec {
ZIO.succeed((body.asStream >>> ZPipeline.decodeCharsWith(Charset.defaultCharset()) >>> codec.streamDecoder).orDie)

def encodeToBody(value: ZStream[Any, Nothing, E], codec: BinaryCodec[E])(implicit trace: Trace): Body =
Body.fromStream(value >>> codec.streamEncoder)
Body.fromStreamChunked(value >>> codec.streamEncoder)

def encodeToBody(value: ZStream[Any, Nothing, E], codec: Codec[String, Char, E])(implicit trace: Trace): Body =
Body.fromStream(value >>> codec.streamEncoder.map(_.toByte))
Body.fromStreamChunked(value >>> codec.streamEncoder.map(_.toByte))

type Element = E
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,15 @@ private[codec] object EncoderDecoder {
} else None
private def encodeBody(inputs: Array[Any], contentType: => Header.ContentType): Body = {
if (isByteStream) {
Body.fromStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]])
Body.fromStreamChunked(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]])
} else {
if (inputs.length > 1) {
Body.fromMultipartForm(encodeMultipartFormData(inputs), formBoundary)
} else {
if (isEventStream) {
Body.fromCharSequenceStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode))
Body.fromCharSequenceStreamChunked(
inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode),
)
} else if (inputs.length < 1) {
Body.empty
} else {
Expand Down
13 changes: 12 additions & 1 deletion zio-http/src/main/scala/zio/http/netty/NettyBody.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ object NettyBody extends BodyEncoding {

private[zio] def fromAsync(
unsafeAsync: UnsafeAsync => Unit,
knownContentLength: Option[Long],
contentTypeHeader: Option[Header.ContentType] = None,
): Body = AsyncBody(unsafeAsync, contentTypeHeader.map(_.mediaType), contentTypeHeader.flatMap(_.boundary))
): Body = AsyncBody(
unsafeAsync,
knownContentLength,
contentTypeHeader.map(_.mediaType),
contentTypeHeader.flatMap(_.boundary),
)

/**
* Helper to create Body from ByteBuf
Expand Down Expand Up @@ -79,6 +85,8 @@ object NettyBody extends BodyEncoding {

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))

override def knownContentLength: Option[Long] = Some(asciiString.length().toLong)
}

private[zio] final case class ByteBufBody(
Expand Down Expand Up @@ -109,10 +117,13 @@ object NettyBody extends BodyEncoding {

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))

override def knownContentLength: Option[Long] = Some(byteBuf.readableBytes().toLong)
}

private[zio] final case class AsyncBody(
unsafeAsync: UnsafeAsync => Unit,
knownContentLength: Option[Long],
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body
Expand Down
Loading

0 comments on commit 48b74b5

Please sign in to comment.