Skip to content

Commit

Permalink
Merge branch 'main' into optimise_fromServerSentEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii authored Dec 14, 2024
2 parents c0bf0d0 + 1519ce6 commit 0069e40
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 23 deletions.
10 changes: 9 additions & 1 deletion zio-http/jvm/src/test/scala/zio/http/MediaTypeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object MediaTypeSpec extends ZIOHttpSpec {
}
},
test("custom mime type parsing") {
assertTrue(MediaType.parseCustomMediaType("custom/mime").contains(MediaType("custom", "mime")))
assertTrue(MediaType.parseCustomMediaType("custom/mime").contains(MediaType("custom", "mime", binary = true)))
},
test("optional parameter parsing") {
assertTrue(
Expand All @@ -48,5 +48,13 @@ object MediaTypeSpec extends ZIOHttpSpec {
),
)
},
test("application/x-zip-compressed should be binary") {
val mediaType = MediaType.forContentType("application/x-zip-compressed")
assertTrue(mediaType.exists(_.binary))
},
test("text/plain should not be binary") {
val mediaType = MediaType.forContentType("text/plain")
assertTrue(mediaType.exists(!_.binary))
},
)
}
46 changes: 28 additions & 18 deletions zio-http/shared/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ object Body {
override def asChunk(implicit trace: Trace): Task[Chunk[Byte]] = zioEmptyChunk

override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] = ZStream.empty
override def isComplete: Boolean = true

override def isComplete: Boolean = true

override def isEmpty: Boolean = true

Expand All @@ -471,7 +472,8 @@ object Body {
override private[zio] def unsafeAsArray(implicit unsafe: Unsafe): Array[Byte] = Array.empty[Byte]

override def contentType(newContentType: Body.ContentType): Body = this
override def contentType: Option[Body.ContentType] = None

override def contentType: Option[Body.ContentType] = None

override def knownContentLength: Option[Long] = Some(0L)
}
Expand Down Expand Up @@ -568,27 +570,35 @@ object Body {
override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] =
ZStream.unwrap {
ZIO.blocking {
for {
r <- ZIO.attempt {
ZIO.suspendSucceed {
try {
val fs = new FileInputStream(file)
val size = Math.min(chunkSize.toLong, file.length()).toInt

(fs, size)
}
(fs, size) = r
} yield ZStream
.repeatZIOOption[Any, Throwable, Chunk[Byte]] {
for {
buffer <- ZIO.succeed(new Array[Byte](size))
len <- ZIO.attempt(fs.read(buffer)).mapError(Some(_))
bytes <-
if (len > 0) ZIO.succeed(Chunk.fromArray(buffer.slice(0, len)))
else ZIO.fail(None)
} yield bytes
val read: Task[Option[Chunk[Byte]]] =
ZIO.suspendSucceed {
try {
val buffer = new Array[Byte](size)
val len = fs.read(buffer)
if (len > 0) Exit.succeed(Some(Chunk.fromArray(buffer.slice(0, len))))
else Exit.none
} catch {
case e: Throwable => Exit.fail(e)
}
}

Exit.succeed {
// Optimised for our needs version of `ZIO.repeatZIOChunkOption`
ZStream
.unfoldChunkZIO(read)(_.map(_.map(_ -> read)))
.ensuring(ZIO.attempt(fs.close()).ignoreLogged)
}
} catch {
case e: Throwable => Exit.fail(e)
}
.ensuring(ZIO.attempt(fs.close()).ignoreLogged)
}
}
}.flattenChunks
}

override def contentType(newContentType: Body.ContentType): Body = copy(contentType = Some(newContentType))

Expand Down
17 changes: 17 additions & 0 deletions zio-http/shared/src/main/scala/zio/http/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,11 @@ object Handler extends HandlerPlatformSpecific with HandlerVersionSpecific {

def fromFunctionHandler[In]: FromFunctionHandler[In] = new FromFunctionHandler[In](())

/**
* Creates a Handler from an pure function from A to Either[E,B]
*/
def fromFunctionEither[In]: FromFunctionEither[In] = new FromFunctionEither[In](())

/**
* Creates a Handler from an pure function from A to HExit[R,E,B]
*/
Expand Down Expand Up @@ -1222,6 +1227,18 @@ object Handler extends HandlerPlatformSpecific with HandlerVersionSpecific {
}
}

final class FromFunctionEither[In](val self: Unit) extends AnyVal {
def apply[R, Err, Out](f: In => Either[Err, Out]): Handler[Any, Err, In, Out] =
new Handler[Any, Err, In, Out] {
override def apply(in: In): ZIO[Any, Err, Out] =
try {
Exit.fromEither(f(in))
} catch {
case error: Throwable => Exit.die(error)
}
}
}

final class FromFunctionExit[In](val self: Unit) extends AnyVal {
def apply[R, Err, Out](f: In => Exit[Err, Out]): Handler[Any, Err, In, Out] =
new Handler[Any, Err, In, Out] {
Expand Down
5 changes: 5 additions & 0 deletions zio-http/shared/src/main/scala/zio/http/MediaType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ object MediaType extends MediaTypes {
val contentTypeParts = customMediaType.split('/')
if (contentTypeParts.length == 2) {
val subtypeParts = contentTypeParts(1).split(';')
// Default binary to true for unknown types unless they belong to text families
val isBinary = customMediaType != "*/*" && customMediaType != "text/*" && !customMediaType.startsWith("text/")
if (subtypeParts.length >= 1) {
Some(
MediaType(
mainType = contentTypeParts.head,
subType = subtypeParts.head,
binary = isBinary,
parameters = if (subtypeParts.length >= 2) parseOptionalParameters(subtypeParts.tail) else Map.empty,
),
)
Expand All @@ -80,10 +83,12 @@ object MediaType extends MediaTypes {
val contentTypeParts = customMediaType.split('/')
if (contentTypeParts.length == 2) {
val subtypeParts = contentTypeParts(1).split(';')
val isBinary = customMediaType != "*/*" && customMediaType != "text/*" && !customMediaType.startsWith("text/")
if (subtypeParts.length >= 1) {
MediaType(
mainType = contentTypeParts.head,
subType = subtypeParts.head,
binary = isBinary,
parameters = if (subtypeParts.length >= 2) parseOptionalParameters(subtypeParts.tail) else Map.empty,
)
} else throw new IllegalArgumentException(s"Invalid media type $customMediaType")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,10 @@ final case class Endpoint[PathInput, Input, Err, Output, Auth <: AuthType](
)
.getOrElse(defaultMediaTypes)
(endpoint.input ++ authCodec(endpoint.authType)).decodeRequest(request, config).orDie.flatMap { value =>
original(value).map(endpoint.output.encodeResponse(_, outputMediaTypes, config)).catchAll { error =>
ZIO.succeed(endpoint.error.encodeResponse(error, outputMediaTypes, config))
}
original(value).foldZIO(
success = output => Exit.succeed(endpoint.output.encodeResponse(output, outputMediaTypes, config)),
failure = error => Exit.succeed(endpoint.error.encodeResponse(error, outputMediaTypes, config)),
)
}
} -> condition
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ object JsonSchema {
refType,
seenWithCurrent,
)
nested.rootRef.map(k => nested.children + (k -> nested.root)).getOrElse(nested.children)
nested.rootRef.fold(ifEmpty = nested.children)(k => nested.children + (k -> nested.root))
}
.toMap
JsonSchemas(fromZSchema(record, SchemaStyle.Inline), ref, children)
Expand Down

2 comments on commit 0069e40

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 : Performance Benchmarks (PlainTextBenchmarkServer)

concurrency: 256
requests/sec: 376954

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 : Performance Benchmarks (SimpleEffectBenchmarkServer)

concurrency: 256
requests/sec: 370972

Please sign in to comment.