Skip to content

Commit

Permalink
Protobuf SerDes support
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix committed Jul 17, 2024
1 parent fea6dd3 commit a3fc56a
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 1 deletion.
11 changes: 10 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "quixstreams"
dynamic = ["version", "dependencies", "optional-dependencies"]
dynamic = ["version", "dependencies"]
description = "Python library for building stream processing applications with Apache Kafka"
license = {file = "LICENSE"}
readme = "README.md"
Expand All @@ -23,6 +23,15 @@ classifiers = [
"Programming Language :: Python :: 3",
]

[project.optional-dependencies]
all = [
"protobuf"
]

protobuf = [
"protobuf"
]

[project.urls]
Homepage = "https://github.com/quixio/quix-streams"

Expand Down
66 changes: 66 additions & 0 deletions quixstreams/models/serializers/protobuf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import Union, Mapping, Iterable, Dict

from .base import Serializer, Deserializer, SerializationContext
from .exceptions import SerializationError

from google.protobuf.message import Message, DecodeError, EncodeError
from google.protobuf.json_format import MessageToDict, ParseDict, ParseError

__all__ = ("ProtobufSerializer", "ProtobufDeserializer")


class ProtobufSerializer(Serializer):
def __init__(self, msg_type: Message, deterministic: bool = False):
"""
Serializer that returns data in protobuf format.
:param msg_type: protobuf message class.
:param deterministic: If true, requests deterministic serialization of the protobuf, with predictable ordering of map keys
Default - `False`
"""
super().__init__()
self._msg_type = msg_type
self._deterministic = deterministic

def __call__(self, value: Dict, ctx: SerializationContext) -> Union[str, bytes]:
msg = self._msg_type()

try:
return ParseDict(value, msg).SerializeToString(
deterministic=self._deterministic
)
except (EncodeError, ParseError) as exc:
raise SerializationError(str(exc)) from exc


class ProtobufDeserializer(Deserializer):
def __init__(
self,
msg_type: Message,
use_integers_for_enums: bool = False,
):
"""
Deserializer that parses protobuf data into a dictionary suitable for a StreamingDataframe.
:param msg_type: protobuf message class.
:param use_integers_for_enums: If true, use integers instead of enum names.
Default - `False`
"""
super().__init__()
self._msg_type = msg_type
self._use_integers_for_enums = use_integers_for_enums

def __call__(
self, value: bytes, ctx: SerializationContext
) -> Union[Iterable[Mapping], Mapping]:
msg = self._msg_type()

try:
msg.ParseFromString(value)
return MessageToDict(
msg,
always_print_fields_with_no_presence=True,
use_integers_for_enums=self._use_integers_for_enums,
)
except DecodeError as exc:
raise SerializationError(str(exc)) from exc
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ testcontainers==4.5.1; python_version >= '3.9'
pytest
requests>=2.32
docker>=7.1.0 # Required to use requests>=2.32
protobuf>=5.27.2
14 changes: 14 additions & 0 deletions tests/test_quixstreams/test_models/protobuf/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package test;

enum TestEnum {
A = 0;
B = 1;
}

message Test {
string name = 1;
int32 id = 2;
TestEnum enum = 3;
}
46 changes: 46 additions & 0 deletions tests/test_quixstreams/test_models/protobuf/test_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions tests/test_quixstreams/test_models/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@
DoubleDeserializer,
StringDeserializer,
)
from quixstreams.models.serializers.protobuf import (
ProtobufSerializer,
ProtobufDeserializer,
)
from .utils import int_to_bytes, float_to_bytes

from .protobuf.test_pb2 import Test

dummy_context = SerializationContext(topic="topic")


Expand All @@ -34,6 +40,21 @@ class TestSerializers:
(BytesSerializer(), b"abc", b"abc"),
(JSONSerializer(), {"a": 123}, b'{"a":123}'),
(JSONSerializer(), [1, 2, 3], b"[1,2,3]"),
(ProtobufSerializer(Test), {}, b""),
(ProtobufSerializer(Test), {"id": 3}, b"\x10\x03"),
(ProtobufSerializer(Test), {"name": "foo", "id": 2}, b"\n\x03foo\x10\x02"),
# Both values are supported for enum
(
ProtobufSerializer(Test),
{"name": "foo", "id": 2, "enum": "B"},
b"\n\x03foo\x10\x02\x18\x01",
),
(
ProtobufSerializer(Test),
{"name": "foo", "id": 2, "enum": 1},
b"\n\x03foo\x10\x02\x18\x01",
),
(ProtobufSerializer(Test), {"name": "foo"}, b"\n\x03foo"),
],
)
def test_serialize_success(self, serializer: Serializer, value, expected):
Expand All @@ -50,6 +71,7 @@ def test_serialize_success(self, serializer: Serializer, value, expected):
(StringSerializer(), {"a": 123}),
(JSONSerializer(), object()),
(JSONSerializer(), complex(1, 2)),
(ProtobufSerializer(Test), {"bar": 3}),
],
)
def test_serialize_error(self, serializer: Serializer, value):
Expand All @@ -70,6 +92,31 @@ class TestDeserializers:
(BytesDeserializer(), b"123123", b"123123"),
(JSONDeserializer(), b"123123", 123123),
(JSONDeserializer(), b'{"a":"b"}', {"a": "b"}),
(
ProtobufDeserializer(Test),
b"\n\x03foo\x10\x02",
{"enum": "A", "name": "foo", "id": 2},
),
(
ProtobufDeserializer(Test, use_integers_for_enums=True),
b"\n\x03foo\x10\x02",
{"enum": 0, "name": "foo", "id": 2},
),
(
ProtobufDeserializer(Test),
b"\n\x03foo",
{
"enum": "A",
"name": "foo",
"id": 0,
},
),
(
ProtobufDeserializer(Test),
b"\x10\x03",
{"enum": "A", "name": "", "id": 3},
),
(ProtobufDeserializer(Test), b"", {"enum": "A", "name": "", "id": 0}),
],
)
def test_deserialize_no_column_name_success(
Expand All @@ -84,6 +131,7 @@ def test_deserialize_no_column_name_success(
(IntegerDeserializer(), b'{"abc": "abc"}'),
(DoubleDeserializer(), b"abc"),
(JSONDeserializer(), b"{"),
(ProtobufDeserializer(Test), b"\n\x03foo\x10\x02\x13"),
],
)
def test_deserialize_error(self, deserializer: Deserializer, value):
Expand Down

0 comments on commit a3fc56a

Please sign in to comment.