Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protobuf SerDes support #402

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/advanced/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Quix Streams supports multiple serialization formats to exchange data between Ka
- `double`
- `json`
- `avro`
- `protobuf`

The serialization settings are defined per-topic using these parameters of `Application.topic()` function:

Expand Down Expand Up @@ -103,3 +104,29 @@ app = Application(broker_address='localhost:9092', consumer_group='consumer')
input_topic = app.topic('input', value_deserializer=AvroDeserializer(schema=MY_SCHEMA))
output_topic = app.topic('output', value_serializer=AvroSerializer(schema=MY_SCHEMA))
```

## Protobuf
Protocol Buffers are language-neutral, platform-neutral extensible mechanisms for serializing structured data.

You can learn more about the Protocol buffers format [here](https://protobuf.dev/)
The Protobuf serializer and deserializer need to be passed explicitly.

In the current version, the schema must be provided manually.

> ***WARNING***: The protobuf serializer and deserializer requires the protobuf library.
> You can install quixstreams with the necessary dependencies using
> `pip install quixstreams[protobuf]`

```python
from quixstreams import Application
from quixstreams.models.serialize.protobuf import ProtobufSerializer, ProtobufDeserializer

from my_input_models_pb2 import InputProto
from my_output_models_pb2 import OutputProto

app = Application(broker_address='localhost:9092', consumer_group='consumer')
input_topic = app.topic('input', value_deserializer=ProtobufDeserializer(msg_type=InputProto))
output_topic = app.topic('output', value_serializer=ProtobufSerializer(msg_type=OutputProto))
```

By default the protobuf deserializer will deserialize the message to a python dictionary. Doing it has a big performance impact. You can disable this behavior by initializing the deserializer with `to_dict` set to `False`. The protobuf message object will then be used as the message value limiting the available StreamingDataframe API.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ Homepage = "https://github.com/quixio/quix-streams"

[project.optional-dependencies]
all = [
"fastavro>=1.8,<2.0"
"fastavro>=1.8,<2.0",
"protobuf>=5.27.2,<6.0"
]

avro = ["fastavro>=1.8,<2.0"]
protobuf = ["protobuf>=5.27.2,<6.0"]

[tool.setuptools.packages.find]
include = ["quixstreams*"]
Expand Down
100 changes: 100 additions & 0 deletions quixstreams/models/serializers/protobuf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from typing import Union, Mapping, Iterable, Dict

from .base import Serializer, Deserializer, SerializationContext
from .exceptions import SerializationError

from google.protobuf.message import Message, DecodeError, EncodeError
from google.protobuf.json_format import MessageToDict, ParseDict, ParseError

__all__ = ("ProtobufSerializer", "ProtobufDeserializer")


class ProtobufSerializer(Serializer):
def __init__(
self,
msg_type: Message,
deterministic: bool = False,
ignore_unknown_fields: bool = False,
):
"""
Serializer that returns data in protobuf format.

Serialisation from a python dictionary can have a significant performance impact. An alternative is to pass the serializer an object of the `msg_type` class.

:param msg_type: protobuf message class.
:param deterministic: If true, requests deterministic serialization of the protobuf, with predictable ordering of map keys
Default - `False`
:param ignore_unknown_fields: If True, do not raise errors for unknown fields.
Default - `False`
"""
super().__init__()
self._msg_type = msg_type

self._deterministic = deterministic
self._ignore_unknown_fields = ignore_unknown_fields

def __call__(
self, value: Union[Dict, Message], ctx: SerializationContext
) -> Union[str, bytes]:

try:
if isinstance(value, self._msg_type):
return value.SerializeToString(deterministic=self._deterministic)

msg = self._msg_type()
return ParseDict(
value, msg, ignore_unknown_fields=self._ignore_unknown_fields
).SerializeToString(deterministic=self._deterministic)
except (EncodeError, ParseError) as exc:
raise SerializationError(str(exc)) from exc


class ProtobufDeserializer(Deserializer):
def __init__(
self,
msg_type: Message,
use_integers_for_enums: bool = False,
preserving_proto_field_name: bool = False,
to_dict: bool = True,
):
"""
Deserializer that parses protobuf data into a dictionary suitable for a StreamingDataframe.

Deserialisation to a python dictionary can have a significant performance impact. You can disable this behavior using `to_dict`, in that case the protobuf message will be used as the StreamingDataframe row value.

:param msg_type: protobuf message class.
:param use_integers_for_enums: If true, use integers instead of enum names.
Default - `False`
:param preserving_proto_field_name: If True, use the original proto field names as
defined in the .proto file. If False, convert the field names to
lowerCamelCase.
Default - `False`
:param to_dict: If false, return the protobuf message instead of a dict.
Default - `True`
"""
super().__init__()
self._msg_type = msg_type
self._to_dict = to_dict

self._use_integers_for_enums = use_integers_for_enums
self._preserving_proto_field_name = preserving_proto_field_name

def __call__(
self, value: bytes, ctx: SerializationContext
) -> Union[Iterable[Mapping], Mapping, Message]:
msg = self._msg_type()

try:
msg.ParseFromString(value)
except DecodeError as exc:
quentin-quix marked this conversation as resolved.
Show resolved Hide resolved
raise SerializationError(str(exc)) from exc

if not self._to_dict:
return msg

return MessageToDict(
msg,
always_print_fields_with_no_presence=True,
use_integers_for_enums=self._use_integers_for_enums,
preserving_proto_field_name=self._preserving_proto_field_name,
)
3 changes: 2 additions & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ testcontainers==4.5.1; python_version >= '3.9'
pytest
requests>=2.32
docker>=7.1.0 # Required to use requests>=2.32
fastavro>=1.8,<2.0
fastavro>=1.8,<2.0
protobuf>=5.27.2
14 changes: 14 additions & 0 deletions tests/test_quixstreams/test_models/protobuf/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package test;

enum TestEnum {
A = 0;
B = 1;
}

message Test {
string name = 1;
int32 id = 2;
TestEnum enum = 3;
}
46 changes: 46 additions & 0 deletions tests/test_quixstreams/test_models/protobuf/test_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 56 additions & 1 deletion tests/test_quixstreams/test_models/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
DoubleDeserializer,
StringDeserializer,
)
from .utils import int_to_bytes, float_to_bytes
from quixstreams.models.serializers.protobuf import (
ProtobufSerializer,
ProtobufDeserializer,
)

from quixstreams.models.serializers.avro import AvroDeserializer, AvroSerializer

from .utils import int_to_bytes, float_to_bytes
from .protobuf.test_pb2 import Test

AVRO_TEST_SCHEMA = {
"type": "record",
"name": "testschema",
Expand All @@ -31,6 +37,7 @@
],
}


dummy_context = SerializationContext(topic="topic")

JSONSCHEMA_TEST_SCHEMA = {
Expand Down Expand Up @@ -74,6 +81,22 @@ class TestSerializers:
b"\x06foo\xf6\x01",
),
(AvroSerializer(AVRO_TEST_SCHEMA), {"name": "foo"}, b"\x06foo\x00"),
(ProtobufSerializer(Test), {}, b""),
(ProtobufSerializer(Test), {"id": 3}, b"\x10\x03"),
(ProtobufSerializer(Test), {"name": "foo", "id": 2}, b"\n\x03foo\x10\x02"),
daniil-quix marked this conversation as resolved.
Show resolved Hide resolved
(ProtobufSerializer(Test), Test(name="foo", id=2), b"\n\x03foo\x10\x02"),
# Both values are supported for enum
(
ProtobufSerializer(Test),
{"name": "foo", "id": 2, "enum": "B"},
b"\n\x03foo\x10\x02\x18\x01",
),
(
ProtobufSerializer(Test),
{"name": "foo", "id": 2, "enum": 1},
b"\n\x03foo\x10\x02\x18\x01",
),
(ProtobufSerializer(Test), {"name": "foo"}, b"\n\x03foo"),
],
)
def test_serialize_success(self, serializer: Serializer, value, expected):
Expand Down Expand Up @@ -103,6 +126,7 @@ def test_serialize_success(self, serializer: Serializer, value, expected):
(AvroSerializer(AVRO_TEST_SCHEMA), {"foo": "foo", "id": 123}),
(AvroSerializer(AVRO_TEST_SCHEMA), {"id": 123}),
(AvroSerializer(AVRO_TEST_SCHEMA, strict=True), {"name": "foo"}),
(ProtobufSerializer(Test), {"bar": 3}),
],
)
def test_serialize_error(self, serializer: Serializer, value):
Expand Down Expand Up @@ -151,6 +175,36 @@ class TestDeserializers:
b"\x06foo\x00",
{"name": "foo", "id": 0},
),
(
ProtobufDeserializer(Test),
b"\n\x03foo\x10\x02",
{"enum": "A", "name": "foo", "id": 2},
),
(
ProtobufDeserializer(Test, to_dict=False),
b"\n\x03foo\x10\x02",
Test(name="foo", id=2),
),
(
ProtobufDeserializer(Test, use_integers_for_enums=True),
b"\n\x03foo\x10\x02",
{"enum": 0, "name": "foo", "id": 2},
),
(
ProtobufDeserializer(Test),
b"\n\x03foo",
{
"enum": "A",
"name": "foo",
"id": 0,
},
),
(
ProtobufDeserializer(Test),
b"\x10\x03",
{"enum": "A", "name": "", "id": 3},
),
(ProtobufDeserializer(Test), b"", {"enum": "A", "name": "", "id": 0}),
],
)
def test_deserialize_no_column_name_success(
Expand All @@ -176,6 +230,7 @@ def test_deserialize_no_column_name_success(
b'{"id":10}',
),
(AvroDeserializer(AVRO_TEST_SCHEMA), b"\x26foo\x00"),
(ProtobufDeserializer(Test), b"\n\x03foo\x10\x02\x13"),
],
)
def test_deserialize_error(self, deserializer: Deserializer, value):
Expand Down
Loading