Skip to content

Commit

Permalink
Added HybridContentLengthHandler (zio#3003)
Browse files Browse the repository at this point in the history
* 2908

* fixed build error

* closes zio#2908 - added hybridStreaming

* fixed formatting

* remove chunkWriter after usage

* added HybridContentLengthHandler and tests for multiple requests at once

* using Client.live for only 1 client in HybridSpec

* fixed HybridRequestStreamingServerSpec

---------

Co-authored-by: kyri-petrou <[email protected]>
Co-authored-by: Nabil Abdel-Hafeez <[email protected]>
  • Loading branch information
3 people authored Aug 29, 2024
1 parent 1f93176 commit e3930d8
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package zio.http.netty

import io.netty.channel._
import io.netty.handler.codec.http.HttpUtil.getContentLength
import io.netty.handler.codec.http._
import io.netty.handler.stream.ChunkedWriteHandler

class HybridContentLengthHandler(maxAggregatedLength: Int) extends ChannelInboundHandlerAdapter {
var maxLength = maxAggregatedLength
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
msg match {
case httpMessage: HttpMessage =>
val contentLength = getContentLength(httpMessage, -1L)
if (contentLength > maxAggregatedLength) {
if (ctx.pipeline().get(classOf[HttpObjectAggregator]) != null) {
ctx.pipeline().replace(classOf[HttpObjectAggregator], Names.ChunkedWriteHandler, new ChunkedWriteHandler())
}
} else {
if (ctx.pipeline().get(classOf[ChunkedWriteHandler]) != null) {
ctx
.pipeline()
.replace(classOf[ChunkedWriteHandler], Names.HttpObjectAggregator, new HttpObjectAggregator(maxLength))
}
}
case _ => // Ignore non-HTTP messages
}
ctx.fireChannelRead(msg): Unit
}
}
2 changes: 2 additions & 0 deletions zio-http/jvm/src/main/scala/zio/http/netty/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ package object netty {
val HttpResponseEncoder = "HTTP_RESPONSE_ENCODER"
val ProxyHandler = "PROXY_HANDLER"
val ReadTimeoutHandler = "READ_TIMEOUT_HANDLER"
var HybridContentLengthHandler = "HYBRID_CONTENT_LENGTH_HANDLER"
var ChunkedWriteHandler = "CHUNKED_WRITE_HANDLER"
}

implicit class BodyExtensions(val body: Body) extends AnyVal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.http.Server
import zio.http.Server.RequestStreaming
import zio.http.netty.Names
import zio.http.netty.model.Conversions
import zio.http.netty.{HybridContentLengthHandler, Names}

import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel._
Expand Down Expand Up @@ -83,9 +83,12 @@ private[zio] final case class ServerChannelInitializer(

// ObjectAggregator
cfg.requestStreaming match {
case RequestStreaming.Enabled =>
case RequestStreaming.Disabled(maximumContentLength) =>
case RequestStreaming.Enabled =>
case RequestStreaming.Disabled(maximumContentLength) =>
pipeline.addLast(Names.HttpObjectAggregator, new HttpObjectAggregator(maximumContentLength))
case RequestStreaming.Hybrid(aggregatedContentLength) =>
pipeline.addLast(Names.HybridContentLengthHandler, new HybridContentLengthHandler(aggregatedContentLength))
pipeline.addLast(Names.HttpObjectAggregator, new HttpObjectAggregator(aggregatedContentLength))
}

// ExpectContinueHandler
Expand Down
84 changes: 84 additions & 0 deletions zio-http/jvm/src/test/scala/zio/http/HybridStreamingSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zio.http

import zio._
import zio.test.Assertion.{equalTo, isFalse, isTrue}
import zio.test.TestAspect.{diagnose, sequential, shrinks, withLiveClock}
import zio.test.{assert, suite, test}

import zio.http.ServerSpec.requestBodySpec
import zio.http.internal.{DynamicServer, HttpRunnableSpec}
import zio.http.netty.NettyConfig

object HybridRequestStreamingServerSpec extends HttpRunnableSpec {
def extractStatus(res: Response): Status = res.status

private val MaxSize = 1024 * 10

private val configAppWithHybridRequestStreaming =
Server.Config.default
.port(0)
.requestDecompression(true)
.hybridRequestStreaming(MaxSize)

private val appWithHybridReqStreaming = serve

/**
* Generates a string of the provided length and char.
*/
private def genString(size: Int, char: Char): String = {
val buffer = new Array[Char](size)
for (i <- 0 until size) buffer(i) = char
new String(buffer)
}

val hybridStreamingServerSpec = suite("HybridStreamingServerSpec")(
test("multiple requests with same connection") {
val sizes = List(MaxSize - 1, MaxSize + 1, MaxSize - 1)
val routes = Handler
.fromFunctionZIO[Request] { request =>
ZIO.succeed(Response.text(request.body.isComplete.toString))
}
.sandbox
.toRoutes

def requestContent(size: Int) = Request(body = Body.fromString(genString(size, '?')))

for {
responses <- ZIO.collectAll(sizes.map(size => routes.deploy(requestContent(size)).flatMap(_.body.asString)))
} yield {
val expectedResults = sizes.map(size => if (size > MaxSize) "false" else "true")
assert(responses)(equalTo(expectedResults))
}
},
)

override def spec =
suite("HybridRequestStreamingServerSpec") {
suite("app with hybrid request streaming") {
appWithHybridReqStreaming.as(List(requestBodySpec, hybridStreamingServerSpec))
}
}.provideShared(
DynamicServer.live,
ZLayer.succeed(configAppWithHybridRequestStreaming),
Server.customized,
ZLayer.succeed(NettyConfig.defaultWithFastShutdown),
Client.live,
ZLayer.succeed(ZClient.Config.default.maxHeaderSize(15000).maxInitialLineLength(15000).disabledConnectionPool),
DnsResolver.default,
) @@ diagnose(15.seconds) @@ sequential @@ shrinks(0) @@ withLiveClock
}
10 changes: 10 additions & 0 deletions zio-http/shared/src/main/scala/zio/http/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ object Server extends ServerPlatformSpecific {
/** Enables streaming request bodies */
def enableRequestStreaming: Config = self.copy(requestStreaming = RequestStreaming.Enabled)

/** Enables hybrid request streaming */
def hybridRequestStreaming(maxAggregatedLength: Int): Config =
self.copy(requestStreaming = RequestStreaming.Hybrid(maxAggregatedLength))

def gracefulShutdownTimeout(duration: Duration): Config = self.copy(gracefulShutdownTimeout = duration)

def idleTimeout(duration: Duration): Config = self.copy(idleTimeout = Some(duration))
Expand Down Expand Up @@ -416,6 +420,12 @@ object Server extends ServerPlatformSpecific {
*/
final case class Disabled(maximumContentLength: Int) extends RequestStreaming

/**
* Hybrid streaming option: Aggregate requests up to a certain size, and
* stream if larger.
*/
final case class Hybrid(maximumAggregatedLength: Int) extends RequestStreaming

lazy val config: zio.Config[RequestStreaming] =
(zio.Config.boolean("enabled").withDefault(true) ++
zio.Config.int("maximum-content-length").withDefault(1024 * 100)).map {
Expand Down

0 comments on commit e3930d8

Please sign in to comment.