-
-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scalapb-runtime-grpc: optimize Marshaller for InProcessTransport #1615
Changes from 1 commit
8575e52
8d8256c
dd57060
6c26c6a
cbf3331
1aa83f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,10 +6,12 @@ import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TypeMapper} | |
|
||
class Marshaller[T <: GeneratedMessage](companion: GeneratedMessageCompanion[T]) | ||
extends io.grpc.MethodDescriptor.Marshaller[T] { | ||
override def stream(t: T): InputStream = new ByteArrayInputStream(t.toByteArray) | ||
override def stream(t: T): InputStream = new ProtoInputStream[T](t, this) | ||
|
||
override def parse(inputStream: InputStream): T = | ||
companion.parseFrom(inputStream) | ||
override def parse(inputStream: InputStream): T = inputStream match { | ||
case pis: ProtoInputStream[T] if pis.marshaller == this => pis.message | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the reason for comparing the marshallers? if not needed, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed in 8d8256c introduced it to reproduce the following precaution logic, however seems to be redundant indeed. |
||
case _ => companion.parseFrom(inputStream) | ||
} | ||
} | ||
|
||
class TypeMappedMarshaller[T <: GeneratedMessage, Custom]( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package scalapb.grpc | ||
|
||
import com.google.protobuf.CodedOutputStream | ||
import scalapb.GeneratedMessage | ||
|
||
import java.io.{ByteArrayInputStream, InputStream} | ||
|
||
/** | ||
* Allows skipping serialization completely when the io.grpc.inprocess.InProcessTransport is used. | ||
* Inspired by https://github.com/grpc/grpc-java/blob/master/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java | ||
*/ | ||
class ProtoInputStream[T <: GeneratedMessage](msg: T, val marshaller: Marshaller[T]) extends InputStream { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add unit tests for this class? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added in 6c26c6a |
||
|
||
private var state: State = Message(msg) | ||
|
||
private sealed trait State { | ||
def message: T = throw new IllegalStateException("message not available") | ||
def available: Int | ||
def read(): Int | ||
def read(b: Array[Byte], off: Int, len: Int): Int | ||
} | ||
|
||
private object Drained extends State { | ||
override def available: Int = 0 | ||
override def read(): Int = -1 | ||
override def read(b: Array[Byte], off: Int, len: Int): Int = -1 | ||
} | ||
|
||
private case class Message(value: T) extends State { | ||
override def available: Int = value.serializedSize | ||
override def message: T = value | ||
override def read(): Int = toStream.read() | ||
override def read(b: Array[Byte], off: Int, len: Int): Int = { | ||
value.serializedSize match { | ||
case 0 => toDrained.read(b, off, len) | ||
case size if size <= len => | ||
val stream = CodedOutputStream.newInstance(b, off, size) | ||
message.writeTo(stream) | ||
stream.flush() | ||
stream.checkNoSpaceLeft() | ||
toDrained | ||
size | ||
case _ => toStream.read(b, off, len) | ||
} | ||
} | ||
private def toStream: State = { | ||
state = Stream(new ByteArrayInputStream(value.toByteArray)) | ||
state | ||
} | ||
private def toDrained: State = { | ||
state = Drained | ||
state | ||
} | ||
} | ||
|
||
private case class Stream(value: InputStream) extends State { | ||
override def available: Int = value.available() | ||
override def read(): Int = value.read() | ||
override def read(b: Array[Byte], off: Int, len: Int): Int = value.read(b, off, len) | ||
} | ||
|
||
override def read(): Int = state.read() | ||
override def read(b: Array[Byte], off: Int, len: Int): Int = state.read(b, off, len) | ||
override def available(): Int = state.available | ||
def message: T = state.message | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: align the whitespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dd57060