Skip to content

Commit

Permalink
fix: significant performance drop with stream.toQueue (#468)
Browse files Browse the repository at this point in the history
  • Loading branch information
regiskuckaertz authored Jan 19, 2023
1 parent bac0610 commit 5d2ebdb
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 17 deletions.
53 changes: 53 additions & 0 deletions benchmarks/src/main/protobuf/testservice.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "scalapb.zio_grpc.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}

// Sends a litany of greetings
rpc SayHelloStreaming (HelloRequest) returns (stream HelloReply) {}
}

// The actual message exchanged by the client and the server.
message Hello {
string name = 1;
double d = 2;
float f = 3;
bool b = 4;
int32 n = 5;
int64 l = 6;
oneof choice {
string c1 = 7;
bool c2 = 8;
}
message Pet {
enum Color {
BLACK = 0;
WHITE = 1;
BLUE = 2;
RED = 3;
YELLOW = 4;
GREEN = 5;
}
string name = 1;
Color color = 2;
}
repeated Pet pets = 9;
}

// The request message from the client.
message HelloRequest {
Hello request = 1;
}

// The response message from the server.
message HelloReply {
Hello response = 1;
}
17 changes: 17 additions & 0 deletions benchmarks/src/main/scala/scalapb/zio_grpc/GreeterImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package scalapb.zio_grpc

import scalapb.zio_grpc.helloworld.testservice.ZioTestservice.ZGreeter
import scalapb.zio_grpc.helloworld.testservice.{HelloReply, HelloRequest}
import io.grpc.Status
import zio.ZIO
import zio.stream.ZStream

class GreeterImpl(size: Long) extends ZGreeter[Any] {

def sayHello(request: HelloRequest): ZIO[Any, Status, HelloReply] =
ZIO.succeed(HelloReply(request.request))

def sayHelloStreaming(request: HelloRequest): ZStream[Any, Status, HelloReply] =
ZStream.repeat(HelloReply(request.request)).take(size)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package scalapb.zio_grpc

import io.grpc.ManagedChannelBuilder
import io.grpc.ServerBuilder
import scalapb.zio_grpc.helloworld.testservice._
import zio._
import java.time

object ServerStreamingBenchmarkApp extends ZIOAppDefault {

val size = 100000L

val server =
ServerLayer.fromEnvironment[ZioTestservice.Greeter](ServerBuilder.forPort(50051))

val client =
ZLayer.scoped[Server] {
for {
ss <- ZIO.service[Server]
port <- ss.port.orDie
ch = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext()
client <- ZioTestservice.GreeterClient.scoped(ZManagedChannel(ch)).orDie
} yield client
}

val service =
ZLayer.succeed[ZioTestservice.Greeter] {
new GreeterImpl(size)
}

def run = ZIO
.foreach(Array(8192, 65536)) { queueSize =>
val props = java.lang.System.getProperties();
props.setProperty("zio-grpc.backpressure-queue-size", queueSize.toString());

for {
_ <- Console.printLine(s"Starting with queue size $queueSize")
cpt <- Ref.make(0)
start <- Clock.instant.flatMap(Ref.make(_))
result <- ZioTestservice.GreeterClient
.sayHelloStreaming(HelloRequest(request = Some(Hello(name = "Testing streaming"))))
.tap(_ => cpt.update(_ + 1))
.tap { _ =>
for {
now <- Clock.instant
started <- start.get
_ <- ZIO.when(time.Duration.between(started, now).toSeconds() >= 10)(
start.set(now) *> cpt.get.flatMap(cpt => Console.printLine(s"Received $cpt messages"))
)
} yield ()
}
.runDrain
.timed
_ <- Console.printLine(s"queue size: $queueSize (${result._1.toMillis()}ms)")
} yield ()
}
.provide(service >+> server >+> client)

}
26 changes: 26 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,32 @@ lazy val e2e =
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
)

lazy val benchmarks =
projectMatrix
.in(file("benchmarks"))
.dependsOn(core)
.defaultAxes()
.enablePlugins(LocalCodeGenPlugin)
.jvmPlatform(ScalaVersions)
.settings(stdSettings)
.settings(
crossScalaVersions := Seq(Scala212, Scala213),
publish / skip := true,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % Version.zio,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
"io.grpc" % "grpc-netty" % Version.grpc
),
Compile / PB.targets := Seq(
scalapb.gen(grpc = true) -> (Compile / sourceManaged).value,
genModule(
"scalapb.zio_grpc.ZioCodeGenerator$"
) -> (Compile / sourceManaged).value
),
PB.protocVersion := "3.13.0",
codeGenClasspath := (codeGenJVM212 / Compile / fullClasspath).value
)

lazy val docs = project
.enablePlugins(LocalCodeGenPlugin)
.in(file("zio-grpc-docs"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import scalapb.zio_grpc.RequestContext
import io.grpc.Metadata
import scalapb.zio_grpc.SafeMetadata
import zio.stm.TSemaphore
import zio.stream.Take

class ZServerCallHandler[Req, Res](
runtime: Runtime[Any],
Expand Down Expand Up @@ -109,31 +108,29 @@ object ZServerCallHandler {
call: ZServerCall[Res],
stream: ZStream[Any, Status, Res]
): ZIO[Any, Status, Unit] = {
def innerLoop(queue: Dequeue[Take[Status, Res]], buffer: Ref[Chunk[Res]]): ZIO[Any, Status, Boolean] =
buffer
.modify(chunk => chunk.headOption -> chunk.drop(1))
def innerLoop(queue: Dequeue[Exit[Option[Status], Res]]): ZIO[Any, Status, Boolean] =
queue.take
.flatMap {
case None =>
queue.take.flatMap(
_.foldZIO(ZIO.succeed(false), ZIO.failCause(_), buffer.set(_) *> innerLoop(queue, buffer))
)
case Some(res) =>
call.sendMessage(res).as(true)
case Exit.Success(res) => call.sendMessage(res).as(true)
case Exit.Failure(cause) =>
cause.failureOrCause match {
case Left(Some(status)) => ZIO.fail(status)
case Left(None) => ZIO.succeed(false)
case Right(cause) => ZIO.failCause(cause)
}
}
.repeatWhileZIO(res => call.isReady.map(_ && res))

def outerLoop(queue: Dequeue[Take[Status, Res]])(buffer: Ref[Chunk[Res]]): ZIO[Any, Status, Boolean] =
(call.awaitReady *> innerLoop(queue, buffer))
def outerLoop(queue: Dequeue[Exit[Option[Status], Res]]): ZIO[Any, Status, Boolean] =
(call.awaitReady *> innerLoop(queue))
.repeatWhile(identity)

for {
queueSize <- backpressureQueueSize
_ <- ZIO.scoped(
_ <- ZIO.scoped[Any](
stream
.toQueue(queueSize)
.flatMap { queue =>
Ref.make[Chunk[Res]](Chunk.empty).flatMap(outerLoop(queue))
}
.toQueueOfElements(queueSize)
.flatMap(outerLoop)
)
} yield ()
}
Expand Down

0 comments on commit 5d2ebdb

Please sign in to comment.