Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MessagePack item serializer and validator #628

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8395a35
Add msgpack item serializer and validator
jarmuszz Aug 11, 2024
f051c31
Merge branch 'gnieh:main' into main
jarmuszz Aug 11, 2024
52b48d7
Fix one additional argument being passed
jarmuszz Aug 13, 2024
7b4246e
Add tests to cover all fmts for msgpack serializer
jarmuszz Aug 13, 2024
9bc1452
Remove debug code in serializer test
jarmuszz Aug 13, 2024
eed74a4
Add validation test cases
jarmuszz Aug 15, 2024
54d7d55
Make msgpack item serializer omit leading zeros
jarmuszz Aug 17, 2024
1ed1f73
Make Extension tests use `ByteVector.fill`
jarmuszz Aug 18, 2024
ac14528
Refine msgpack fixpoint test
jarmuszz Aug 18, 2024
671e693
Remove redundant `padLeft`s when size is known
jarmuszz Aug 18, 2024
b732583
Reformat ValidationSpec.scala
jarmuszz Aug 18, 2024
3f30e33
Remove scaladoc from an embedded function
jarmuszz Aug 18, 2024
aa8658a
Add benchmars for msgpack item serializer
jarmuszz Aug 24, 2024
cd9782e
Merge msgpack serializers
jarmuszz Sep 5, 2024
cdc4894
Make `SerializerSpec` no longer extend `Checkers`
jarmuszz Sep 7, 2024
309569e
Make `msgpack.low` API similar to `cbor.low` API
jarmuszz Sep 7, 2024
3d717a3
Update msgpack serializer spec documentation
jarmuszz Sep 7, 2024
deede3f
Change `msgpack.low.toBinary` scaladoc
jarmuszz Sep 10, 2024
698f727
Fix msgpack doc generation
jarmuszz Sep 10, 2024
248fbc6
Add doc for `msgpack.low` public methods
jarmuszz Sep 10, 2024
4760221
Run prePR
jarmuszz Sep 10, 2024
041e135
Extract literals into constants
jarmuszz Sep 14, 2024
2be8831
Fix msgpack serialization test of negative fixint
jarmuszz Sep 14, 2024
fd845e8
Make msgpack Array and Map use Long for sizes
jarmuszz Sep 21, 2024
482bf9e
Make msgpack exceptions public
jarmuszz Sep 22, 2024
8d67768
Move Pull.pure(None) into a constant
jarmuszz Sep 22, 2024
05c4c1c
Use bit shifts instead of `Math.pow(2, n)`
jarmuszz Sep 22, 2024
fce1083
Use `.redeem` in msgpack validation spec
jarmuszz Sep 23, 2024
989ec8a
Use binary data in msgpack serializer benchmark
jarmuszz Oct 27, 2024
59d5e3f
Use binary data in msgpack parser benchmark
jarmuszz Oct 30, 2024
9e787e5
Merge branch 'gnieh:main' into main
jarmuszz Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2024 fs2-data Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data.benchmarks

import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._

import cats.effect.SyncIO

import scodec.bits._
import fs2._

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
@State(org.openjdk.jmh.annotations.Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3, time = 2)
@Measurement(iterations = 10, time = 2)
class MsgPackItemSerializerBenchmarks {
val msgpackItems: List[fs2.data.msgpack.low.MsgpackItem] = {
val bytes =
fs2.io
.readClassLoaderResource[SyncIO]("twitter_msgpack.txt", 4096)
.through(fs2.text.utf8.decode)
.compile
.string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, this destroys all chunking, you'll work with one gigantic Chunk – not what real life code should usually do. I think you can simply combine those two streams roughly along the lines of

fs2.io
        .readClassLoaderResource[SyncIO]("twitter_msgpack.txt", 4096)
        .through(fs2.text.utf8.decode)
        .map(str => Chunk.byteVector(ByteVector.fromHex(str).get))
        .unchunks
        .through(fs2.data.msgpack.low.items[SyncIO])
        .compile
        .toList
        .unsafeRunSync()

Copy link
Contributor Author

@jarmuszz jarmuszz Oct 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the twitter_msgpack.txt file contained space separated hex representations of bytes, the fs2.text.utf8.decode pipe could slice some bytes in half between two chunks. E.g. "A1 B2 C3" would be converted into Chunk(0x0a, 0x1b) and Chunk(0x02, 0xc3) if our byte stream had a chunk size equal to 4.

We could remove spaces from input data but even then we would rely on the chunk size being a multiple of two which I think is a bit fragile and hard to debug.

Instead, I converted the test data into a raw binary form which is loaded into memory as a chunked stream before benchmarks are run. The implementation looks a bit clunky but I'm not aware of any better way to achieve this.

.map(ByteVector.fromHex(_).get)
.unsafeRunSync()

Stream
.chunk(Chunk.byteVector(bytes))
.through(fs2.data.msgpack.low.items[SyncIO])
.compile
.toList
.unsafeRunSync()
}


@Benchmark
def serialize() =
Stream
.emits(msgpackItems)
.through(fs2.data.msgpack.low.toNonValidatedBinary[SyncIO])
.compile
.drain
.unsafeRunSync()

@Benchmark
def withValidation() =
Stream
.emits(msgpackItems)
.through(fs2.data.msgpack.low.toBinary[SyncIO])
.compile
.drain
.unsafeRunSync()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Copyright 2024 fs2-data Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package data
package msgpack
package low
package internal

import scodec.bits._

private[low] object ItemSerializer {
class MalformedItemError extends Error("item exceeds the maximum size of it's format")
class MalformedStringError extends MalformedItemError
class MalformedBinError extends MalformedItemError
class MalformedIntError extends MalformedItemError
class MalformedUintError extends MalformedItemError
jarmuszz marked this conversation as resolved.
Show resolved Hide resolved

/** Checks whether integer `x` fits in `n` bytes. */
@inline
private def fitsIn(x: Int, n: Long): Boolean =
java.lang.Integer.compareUnsigned(x, (Math.pow(2, n.toDouble).toLong - 1).toInt) <= 0
jarmuszz marked this conversation as resolved.
Show resolved Hide resolved

private case class SerializationContext[F[_]](out: Out[F],
chunk: Chunk[MsgpackItem],
idx: Int,
rest: Stream[F, MsgpackItem])

/** Buffers [[Chunk]] into 4KiB segments before calling [[Pull.output]].
*
* @param contents buffered [[Chunk]]
*/
private class Out[F[_]](contents: Chunk[Byte]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach, however, this creates a new Out instance for each incoming item. If you run the benchmark with allocation and CPU flame graphs, does it appear to be a bottleneck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem like a significant bottleneck to me.

Here's a screenshot from VisualVM memory profiler:
image

I cannot find any reference to Out's methods in CPU samples so either I'm doing something wrong or these methods get optimized out :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about runnin the benchmark with async-profiler outputting flamegraph. Something like:

$ sbt
> project benchmarksJVM
> Jmh/run fs2.data.benchmarks.MsgPackItemParserBenchmarks -prof "async:libPath=/path/to/libasyncProfiler.so;output=flamegraph;event=cpu;alluser=true"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes around 10% of CPU and 17% of allocation graphs.

Making theOut class an implicit around Chunk[Byte] lowers the results into 7% and 15% respectively. I'm not sure how could we optimize this more.

image
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time and memory seem to be dominated by ByteVector creation at every step. I need to review again these creations, and why they occur. But I could live with merging the current state and improving later on.

private val limit = 4096

/** Pushes `bv` into the buffer and emits the buffer if it reaches the limit.
*/
@inline
def push(bv: ByteVector): Pull[F, Byte, Out[F]] =
if (contents.size >= limit)
Pull.output(contents).as(new Out(Chunk.byteVector(bv)))
else
Pull.done.as(new Out(contents ++ Chunk.byteVector(bv)))

/** Splices `bv` into segments and pushes them into the buffer while emitting the buffer at the same time so
* that it never exceeds the limit during the operation.
*
* Use this instead of [[Out.push]] when `bv` may significantly exceed 4KiB.
*/
def pushBuffered(bv: ByteVector): Pull[F, Byte, Out[F]] = {
@inline
def go(chunk: Chunk[Byte], rest: ByteVector): Pull[F, Byte, Out[F]] =
if (rest.isEmpty)
Pull.done.as(new Out(chunk))
else
Pull.output(chunk) >> go(Chunk.byteVector(rest.take(limit.toLong)), rest.drop(limit.toLong))

if (bv.isEmpty)
this.push(bv)
else if (contents.size >= limit)
Pull.output(contents) >> go(Chunk.byteVector(bv.take(limit.toLong)), bv.drop(limit.toLong))
else
go(contents ++ Chunk.byteVector(bv.take(limit.toLong - contents.size)), bv.drop(limit.toLong - contents.size))
}

/** Outputs the whole buffer. */
@inline
def flush = Pull.output(contents)
}

@inline
private def step[F[_]: RaiseThrowable](o: Out[F], item: MsgpackItem): Pull[F, Byte, Out[F]] = item match {
case MsgpackItem.UnsignedInt(bytes) =>
val bs = bytes.dropWhile(_ == 0)
if (bs.size <= 1)
o.push(ByteVector(Headers.Uint8) ++ bs.padLeft(1))
else if (bs.size <= 2)
o.push(ByteVector(Headers.Uint16) ++ bs.padLeft(2))
else if (bs.size <= 4)
o.push(ByteVector(Headers.Uint32) ++ bs.padLeft(4))
else if (bs.size <= 8)
o.push(ByteVector(Headers.Uint64) ++ bs.padLeft(8))
else
Pull.raiseError(new MalformedUintError)

case MsgpackItem.SignedInt(bytes) =>
val bs = bytes.dropWhile(_ == 0)
if (bs.size <= 1)
// positive fixint or negative fixint
if ((bs & hex"7f") == bs || (bs & hex"c0") == hex"c0")
jarmuszz marked this conversation as resolved.
Show resolved Hide resolved
o.push(bs.padLeft(1))
else
o.push(ByteVector(Headers.Int8) ++ bs.padLeft(1))
else if (bs.size <= 2)
o.push(ByteVector(Headers.Int16) ++ bs.padLeft(2))
else if (bs.size <= 4)
o.push(ByteVector(Headers.Int32) ++ bs.padLeft(4))
else if (bs.size <= 8)
o.push(ByteVector(Headers.Int64) ++ bs.padLeft(8))
else
Pull.raiseError(new MalformedIntError)

case MsgpackItem.Float32(float) =>
o.push(ByteVector(Headers.Float32) ++ ByteVector.fromInt(java.lang.Float.floatToIntBits(float)))

case MsgpackItem.Float64(double) =>
o.push(ByteVector(Headers.Float64) ++ ByteVector.fromLong(java.lang.Double.doubleToLongBits(double)))

case MsgpackItem.Str(bytes) =>
if (bytes.size <= 31) {
o.push(ByteVector.fromByte((0xa0 | bytes.size).toByte) ++ bytes)
} else if (bytes.size <= Math.pow(2, 8) - 1) {
val size = ByteVector.fromByte(bytes.size.toByte)
o.push(ByteVector(Headers.Str8) ++ size ++ bytes)
} else if (bytes.size <= Math.pow(2, 16) - 1) {
val size = ByteVector.fromShort(bytes.size.toShort)
o.push(ByteVector(Headers.Str16) ++ size ++ bytes)
} else if (fitsIn(bytes.size.toInt, 32)) {
jarmuszz marked this conversation as resolved.
Show resolved Hide resolved
val size = ByteVector.fromInt(bytes.size.toInt)
/* Max length of str32 (incl. type and length info) is 2^32 + 4 bytes
* which is more than Chunk can handle at once
*/
o.pushBuffered(ByteVector(Headers.Str32) ++ size ++ bytes)
} else {
Pull.raiseError(new MalformedStringError)
}

case MsgpackItem.Bin(bytes) =>
if (bytes.size <= Math.pow(2, 8) - 1) {
val size = ByteVector.fromByte(bytes.size.toByte)
o.push(ByteVector(Headers.Bin8) ++ size ++ bytes)
} else if (bytes.size <= Math.pow(2, 16) - 1) {
val size = ByteVector.fromShort(bytes.size.toShort)
o.push(ByteVector(Headers.Bin16) ++ size ++ bytes)
} else if (fitsIn(bytes.size.toInt, 32)) {
val size = ByteVector.fromInt(bytes.size.toInt)
/* Max length of str32 (incl. type and length info) is 2^32 + 4 bytes
* which is more than Chunk can handle at once
*/
o.pushBuffered(ByteVector(Headers.Bin32) ++ size ++ bytes)
} else {
Pull.raiseError(new MalformedBinError)
}

case MsgpackItem.Array(size) =>
if (fitsIn(size, 4)) {
o.push(ByteVector.fromByte((0x90 | size).toByte))
} else if (size <= Math.pow(2, 16) - 1) {
val s = ByteVector.fromShort(size.toShort)
o.push(ByteVector(Headers.Array16) ++ s)
} else {
val s = ByteVector.fromInt(size)
o.push(ByteVector(Headers.Array32) ++ s)
}

case MsgpackItem.Map(size) =>
if (size <= 15) {
o.push(ByteVector.fromByte((0x80 | size).toByte))
} else if (size <= Math.pow(2, 16) - 1) {
val s = ByteVector.fromShort(size.toShort)
o.push(ByteVector(Headers.Map16) ++ s)
} else {
val s = ByteVector.fromInt(size)
o.push(ByteVector(Headers.Map32) ++ s)
}

case MsgpackItem.Extension(tpe, bytes) =>
val bs = bytes.dropWhile(_ == 0)
if (bs.size <= 1) {
o.push((ByteVector(Headers.FixExt1) :+ tpe) ++ bs.padLeft(1))
} else if (bs.size <= 2) {
o.push((ByteVector(Headers.FixExt2) :+ tpe) ++ bs.padLeft(2))
} else if (bs.size <= 4) {
o.push((ByteVector(Headers.FixExt4) :+ tpe) ++ bs.padLeft(4))
} else if (bs.size <= 8) {
o.push((ByteVector(Headers.FixExt8) :+ tpe) ++ bs.padLeft(8))
} else if (bs.size <= 16) {
o.push((ByteVector(Headers.FixExt16) :+ tpe) ++ bs.padLeft(16))
} else if (bs.size <= Math.pow(2, 8) - 1) {
val size = ByteVector.fromByte(bs.size.toByte)
o.push((ByteVector(Headers.Ext8) ++ size :+ tpe) ++ bs)
} else if (bs.size <= Math.pow(2, 16) - 1) {
val size = ByteVector.fromShort(bs.size.toShort)
o.push((ByteVector(Headers.Ext16) ++ size :+ tpe) ++ bs)
} else {
val size = ByteVector.fromInt(bs.size.toInt)
/* Max length of ext32 (incl. type and length info) is 2^32 + 5 bytes
* which is more than Chunk can handle at once.
*/
o.pushBuffered((ByteVector(Headers.Ext32) ++ size :+ tpe) ++ bs)
}

case MsgpackItem.Timestamp32(seconds) =>
o.push((ByteVector(Headers.FixExt4) :+ Headers.Timestamp.toByte) ++ ByteVector.fromInt(seconds))

case MsgpackItem.Timestamp64(combined) =>
o.push((ByteVector(Headers.FixExt8) :+ Headers.Timestamp.toByte) ++ ByteVector.fromLong(combined))

case MsgpackItem.Timestamp96(nanoseconds, seconds) =>
val ns = ByteVector.fromInt(nanoseconds)
val s = ByteVector.fromLong(seconds)
o.push((ByteVector(Headers.Ext8) :+ 12 :+ Headers.Timestamp.toByte) ++ ns ++ s)

case MsgpackItem.Nil =>
o.push(ByteVector(Headers.Nil))

case MsgpackItem.False =>
o.push(ByteVector(Headers.False))

case MsgpackItem.True =>
o.push(ByteVector(Headers.True))
}

private def stepChunk[F[_]: RaiseThrowable](ctx: SerializationContext[F]): Pull[F, Byte, SerializationContext[F]] =
if (ctx.idx >= ctx.chunk.size)
Pull.done.as(ctx)
else
step(ctx.out, ctx.chunk(ctx.idx)).flatMap { out =>
stepChunk(SerializationContext(out, ctx.chunk, ctx.idx + 1, ctx.rest))
}

def pipe[F[_]: RaiseThrowable]: Pipe[F, MsgpackItem, Byte] = { stream =>
def go(out: Out[F], rest: Stream[F, MsgpackItem]): Pull[F, Byte, Unit] =
rest.pull.uncons.flatMap {
case None => out.flush
case Some((chunk, rest)) =>
stepChunk(SerializationContext(out, chunk, 0, rest)).flatMap { case SerializationContext(out, _, _, rest) =>
go(out, rest)
}
}

go(new Out(Chunk.empty), stream).stream
}
}
Loading
Loading