Skip to content

Commit

Permalink
Draft: Protobuf SerDes support
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix committed Jul 17, 2024
1 parent fea6dd3 commit 0942db8
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 1 deletion.
11 changes: 10 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "quixstreams"
dynamic = ["version", "dependencies", "optional-dependencies"]
dynamic = ["version", "dependencies"]
description = "Python library for building stream processing applications with Apache Kafka"
license = {file = "LICENSE"}
readme = "README.md"
Expand All @@ -23,6 +23,15 @@ classifiers = [
"Programming Language :: Python :: 3",
]

[project.optional-dependencies]
all = [
"protobuf"
]

protobuf = [
"protobuf"
]

[project.urls]
Homepage = "https://github.com/quixio/quix-streams"

Expand Down
56 changes: 56 additions & 0 deletions quixstreams/models/serializers/protobuf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from typing import Union, Mapping, Iterable, Dict

from .base import Serializer, Deserializer, SerializationContext
from .exceptions import SerializationError

from google.protobuf.message import Message, DecodeError, EncodeError
from google.protobuf.json_format import MessageToDict, ParseDict, ParseError

__all__ = ("ProtobufSerializer", "ProtobufDeserializer")


class ProtobufSerializer(Serializer):
def __init__(
self,
msg_type: Message,
):
"""
Serializer that returns data in protobuf format.
:param msg_type: protobuf message class.
"""
super().__init__()
self._msg_type = msg_type

def __call__(self, value: Dict, ctx: SerializationContext) -> Union[str, bytes]:
msg = self._msg_type()

try:
return ParseDict(value, msg).SerializeToString(deterministic=True)
except (EncodeError, ParseError) as exc:
raise SerializationError(str(exc)) from exc


class ProtobufDeserializer(Deserializer):
def __init__(
self,
msg_type: Message,
):
"""
Deserializer that parses protobuf data.
:param msg_type: protobuf message class.
"""
super().__init__()
self._msg_type = msg_type

def __call__(
self, value: bytes, ctx: SerializationContext
) -> Union[Iterable[Mapping], Mapping]:
msg = self._msg_type()

try:
msg.ParseFromString(value)
return MessageToDict(msg)
except DecodeError as exc:
raise SerializationError(str(exc)) from exc
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ testcontainers==4.5.1; python_version >= '3.9'
pytest
requests>=2.32
docker>=7.1.0 # Required to use requests>=2.32
protobuf>=5.27.2
8 changes: 8 additions & 0 deletions tests/test_quixstreams/test_models/protobuf/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

package test;

message Test {
string name = 1;
int32 id = 2;
}
38 changes: 38 additions & 0 deletions tests/test_quixstreams/test_models/protobuf/test_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions tests/test_quixstreams/test_models/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@
DoubleDeserializer,
StringDeserializer,
)
from quixstreams.models.serializers.protobuf import (
ProtobufSerializer,
ProtobufDeserializer,
)
from .utils import int_to_bytes, float_to_bytes

from .protobuf.test_pb2 import Test

dummy_context = SerializationContext(topic="topic")


Expand All @@ -34,6 +40,10 @@ class TestSerializers:
(BytesSerializer(), b"abc", b"abc"),
(JSONSerializer(), {"a": 123}, b'{"a":123}'),
(JSONSerializer(), [1, 2, 3], b"[1,2,3]"),
(ProtobufSerializer(Test), {}, b""),
(ProtobufSerializer(Test), {"id": 3}, b"\x10\x03"),
(ProtobufSerializer(Test), {"name": "foo", "id": 2}, b"\n\x03foo\x10\x02"),
(ProtobufSerializer(Test), {"name": "foo"}, b"\n\x03foo"),
],
)
def test_serialize_success(self, serializer: Serializer, value, expected):
Expand All @@ -50,6 +60,7 @@ def test_serialize_success(self, serializer: Serializer, value, expected):
(StringSerializer(), {"a": 123}),
(JSONSerializer(), object()),
(JSONSerializer(), complex(1, 2)),
(ProtobufSerializer(Test), {"bar": 3}),
],
)
def test_serialize_error(self, serializer: Serializer, value):
Expand All @@ -70,6 +81,14 @@ class TestDeserializers:
(BytesDeserializer(), b"123123", b"123123"),
(JSONDeserializer(), b"123123", 123123),
(JSONDeserializer(), b'{"a":"b"}', {"a": "b"}),
(
ProtobufDeserializer(Test),
b"\n\x03foo\x10\x02",
{"name": "foo", "id": 2},
),
(ProtobufDeserializer(Test), b"\n\x03foo", {"name": "foo"}),
(ProtobufDeserializer(Test), b"\x10\x03", {"id": 3}),
(ProtobufDeserializer(Test), b"", {}),
],
)
def test_deserialize_no_column_name_success(
Expand All @@ -84,6 +103,7 @@ def test_deserialize_no_column_name_success(
(IntegerDeserializer(), b'{"abc": "abc"}'),
(DoubleDeserializer(), b"abc"),
(JSONDeserializer(), b"{"),
(ProtobufDeserializer(Test), b"\n\x03foo\x10\x02\x13"),
],
)
def test_deserialize_error(self, deserializer: Deserializer, value):
Expand Down

0 comments on commit 0942db8

Please sign in to comment.