From 7a982dc3329a3ac4b3f15621a885a5821c910a8b Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Tue, 23 Jul 2024 16:40:31 +0200 Subject: [PATCH] Protobuf SerDes support --- docs/advanced/serialization.md | 27 +++++ pyproject.toml | 4 +- quixstreams/models/serializers/protobuf.py | 100 ++++++++++++++++++ tests/requirements.txt | 3 +- .../test_models/protobuf/test.proto | 14 +++ .../test_models/protobuf/test_pb2.py | 46 ++++++++ .../test_models/test_serializers.py | 57 +++++++++- 7 files changed, 248 insertions(+), 3 deletions(-) create mode 100644 quixstreams/models/serializers/protobuf.py create mode 100644 tests/test_quixstreams/test_models/protobuf/test.proto create mode 100644 tests/test_quixstreams/test_models/protobuf/test_pb2.py diff --git a/docs/advanced/serialization.md b/docs/advanced/serialization.md index 6833b0edb..0ffabbdaa 100644 --- a/docs/advanced/serialization.md +++ b/docs/advanced/serialization.md @@ -8,6 +8,7 @@ Quix Streams supports multiple serialization formats to exchange data between Ka - `double` - `json` - `avro` +- `protobuf` The serialization settings are defined per-topic using these parameters of `Application.topic()` function: @@ -103,3 +104,29 @@ app = Application(broker_address='localhost:9092', consumer_group='consumer') input_topic = app.topic('input', value_deserializer=AvroDeserializer(schema=MY_SCHEMA)) output_topic = app.topic('output', value_serializer=AvroSerializer(schema=MY_SCHEMA)) ``` + +## Protobuf +Protocol Buffers are language-neutral, platform-neutral extensible mechanisms for serializing structured data. + +You can learn more about the Protocol buffers format [here](https://protobuf.dev/) +The Protobuf serializer and deserializer need to be passed explicitly. + +In the current version, the schema must be provided manually. + +> ***WARNING***: The protobuf serializer and deserializer requires the protobuf library. +> You can install quixstreams with the necessary dependencies using +> `pip install quixstreams[protobuf]` + +```python +from quixstreams import Application +from quixstreams.models.serialize.protobuf import ProtobufSerializer, ProtobufDeserializer + +from my_input_models_pb2 import InputProto +from my_output_models_pb2 import OutputProto + +app = Application(broker_address='localhost:9092', consumer_group='consumer') +input_topic = app.topic('input', value_deserializer=ProtobufDeserializer(msg_type=InputProto)) +output_topic = app.topic('output', value_serializer=ProtobufSerializer(msg_type=OutputProto)) +``` + +By default the protobuf deserializer will deserialize the message to a python dictionary. Doing it has a big performance impact. You can disable this behavior by initializing the deserializer with `to_dict` set to `False`. The protobuf message object will then be used as the message value limiting the available StreamingDataframe API. diff --git a/pyproject.toml b/pyproject.toml index b868c6c72..3f6edaaf2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,10 +28,12 @@ Homepage = "https://github.com/quixio/quix-streams" [project.optional-dependencies] all = [ - "fastavro>=1.8,<2.0" + "fastavro>=1.8,<2.0", + "protobuf>=5.27.2,<6.0" ] avro = ["fastavro>=1.8,<2.0"] +protobuf = ["protobuf>=5.27.2,<6.0"] [tool.setuptools.packages.find] include = ["quixstreams*"] diff --git a/quixstreams/models/serializers/protobuf.py b/quixstreams/models/serializers/protobuf.py new file mode 100644 index 000000000..67befc39a --- /dev/null +++ b/quixstreams/models/serializers/protobuf.py @@ -0,0 +1,100 @@ +from typing import Union, Mapping, Iterable, Dict + +from .base import Serializer, Deserializer, SerializationContext +from .exceptions import SerializationError + +from google.protobuf.message import Message, DecodeError, EncodeError +from google.protobuf.json_format import MessageToDict, ParseDict, ParseError + +__all__ = ("ProtobufSerializer", "ProtobufDeserializer") + + +class ProtobufSerializer(Serializer): + def __init__( + self, + msg_type: Message, + deterministic: bool = False, + ignore_unknown_fields: bool = False, + ): + """ + Serializer that returns data in protobuf format. + + Serialisation from a python dictionary can have a significant performance impact. An alternative is to pass the serializer an object of the `msg_type` class. + + :param msg_type: protobuf message class. + :param deterministic: If true, requests deterministic serialization of the protobuf, with predictable ordering of map keys + Default - `False` + :param ignore_unknown_fields: If True, do not raise errors for unknown fields. + Default - `False` + """ + super().__init__() + self._msg_type = msg_type + + self._deterministic = deterministic + self._ignore_unknown_fields = ignore_unknown_fields + + def __call__( + self, value: Union[Dict, Message], ctx: SerializationContext + ) -> Union[str, bytes]: + + try: + if isinstance(value, self._msg_type): + return value.SerializeToString(deterministic=self._deterministic) + + msg = self._msg_type() + return ParseDict( + value, msg, ignore_unknown_fields=self._ignore_unknown_fields + ).SerializeToString(deterministic=self._deterministic) + except (EncodeError, ParseError) as exc: + raise SerializationError(str(exc)) from exc + + +class ProtobufDeserializer(Deserializer): + def __init__( + self, + msg_type: Message, + use_integers_for_enums: bool = False, + preserving_proto_field_name: bool = False, + to_dict: bool = True, + ): + """ + Deserializer that parses protobuf data into a dictionary suitable for a StreamingDataframe. + + Deserialisation to a python dictionary can have a significant performance impact. You can disable this behavior using `to_dict`, in that case the protobuf message will be used as the StreamingDataframe row value. + + :param msg_type: protobuf message class. + :param use_integers_for_enums: If true, use integers instead of enum names. + Default - `False` + :param preserving_proto_field_name: If True, use the original proto field names as + defined in the .proto file. If False, convert the field names to + lowerCamelCase. + Default - `False` + :param to_dict: If false, return the protobuf message instead of a dict. + Default - `True` + """ + super().__init__() + self._msg_type = msg_type + self._to_dict = to_dict + + self._use_integers_for_enums = use_integers_for_enums + self._preserving_proto_field_name = preserving_proto_field_name + + def __call__( + self, value: bytes, ctx: SerializationContext + ) -> Union[Iterable[Mapping], Mapping, Message]: + msg = self._msg_type() + + try: + msg.ParseFromString(value) + except DecodeError as exc: + raise SerializationError(str(exc)) from exc + + if not self._to_dict: + return msg + + return MessageToDict( + msg, + always_print_fields_with_no_presence=True, + use_integers_for_enums=self._use_integers_for_enums, + preserving_proto_field_name=self._preserving_proto_field_name, + ) diff --git a/tests/requirements.txt b/tests/requirements.txt index 65b64e66c..5d10f0287 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -3,4 +3,5 @@ testcontainers==4.5.1; python_version >= '3.9' pytest requests>=2.32 docker>=7.1.0 # Required to use requests>=2.32 -fastavro>=1.8,<2.0 \ No newline at end of file +fastavro>=1.8,<2.0 +protobuf>=5.27.2 diff --git a/tests/test_quixstreams/test_models/protobuf/test.proto b/tests/test_quixstreams/test_models/protobuf/test.proto new file mode 100644 index 000000000..6b600d844 --- /dev/null +++ b/tests/test_quixstreams/test_models/protobuf/test.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package test; + +enum TestEnum { + A = 0; + B = 1; +} + +message Test { + string name = 1; + int32 id = 2; + TestEnum enum = 3; +} diff --git a/tests/test_quixstreams/test_models/protobuf/test_pb2.py b/tests/test_quixstreams/test_models/protobuf/test_pb2.py new file mode 100644 index 000000000..e8bb7a831 --- /dev/null +++ b/tests/test_quixstreams/test_models/protobuf/test_pb2.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: tests/test_quixstreams/test_models/protobuf/test.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n6tests/test_quixstreams/test_models/protobuf/test.proto\x12\x04test">\n\x04Test\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x1c\n\x04\x65num\x18\x03 \x01(\x0e\x32\x0e.test.TestEnum*\x18\n\x08TestEnum\x12\x05\n\x01\x41\x10\x00\x12\x05\n\x01\x42\x10\x01\x62\x06proto3' +) + +_TESTENUM = DESCRIPTOR.enum_types_by_name["TestEnum"] +TestEnum = enum_type_wrapper.EnumTypeWrapper(_TESTENUM) +A = 0 +B = 1 + + +_TEST = DESCRIPTOR.message_types_by_name["Test"] +Test = _reflection.GeneratedProtocolMessageType( + "Test", + (_message.Message,), + { + "DESCRIPTOR": _TEST, + "__module__": "tests.test_quixstreams.test_models.protobuf.test_pb2", + # @@protoc_insertion_point(class_scope:test.Test) + }, +) +_sym_db.RegisterMessage(Test) + +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _TESTENUM._serialized_start = 128 + _TESTENUM._serialized_end = 152 + _TEST._serialized_start = 64 + _TEST._serialized_end = 126 +# @@protoc_insertion_point(module_scope) diff --git a/tests/test_quixstreams/test_models/test_serializers.py b/tests/test_quixstreams/test_models/test_serializers.py index 4d24f8379..f390442f5 100644 --- a/tests/test_quixstreams/test_models/test_serializers.py +++ b/tests/test_quixstreams/test_models/test_serializers.py @@ -18,10 +18,16 @@ DoubleDeserializer, StringDeserializer, ) -from .utils import int_to_bytes, float_to_bytes +from quixstreams.models.serializers.protobuf import ( + ProtobufSerializer, + ProtobufDeserializer, +) from quixstreams.models.serializers.avro import AvroDeserializer, AvroSerializer +from .utils import int_to_bytes, float_to_bytes +from .protobuf.test_pb2 import Test + AVRO_TEST_SCHEMA = { "type": "record", "name": "testschema", @@ -31,6 +37,7 @@ ], } + dummy_context = SerializationContext(topic="topic") JSONSCHEMA_TEST_SCHEMA = { @@ -74,6 +81,22 @@ class TestSerializers: b"\x06foo\xf6\x01", ), (AvroSerializer(AVRO_TEST_SCHEMA), {"name": "foo"}, b"\x06foo\x00"), + (ProtobufSerializer(Test), {}, b""), + (ProtobufSerializer(Test), {"id": 3}, b"\x10\x03"), + (ProtobufSerializer(Test), {"name": "foo", "id": 2}, b"\n\x03foo\x10\x02"), + (ProtobufSerializer(Test), Test(name="foo", id=2), b"\n\x03foo\x10\x02"), + # Both values are supported for enum + ( + ProtobufSerializer(Test), + {"name": "foo", "id": 2, "enum": "B"}, + b"\n\x03foo\x10\x02\x18\x01", + ), + ( + ProtobufSerializer(Test), + {"name": "foo", "id": 2, "enum": 1}, + b"\n\x03foo\x10\x02\x18\x01", + ), + (ProtobufSerializer(Test), {"name": "foo"}, b"\n\x03foo"), ], ) def test_serialize_success(self, serializer: Serializer, value, expected): @@ -103,6 +126,7 @@ def test_serialize_success(self, serializer: Serializer, value, expected): (AvroSerializer(AVRO_TEST_SCHEMA), {"foo": "foo", "id": 123}), (AvroSerializer(AVRO_TEST_SCHEMA), {"id": 123}), (AvroSerializer(AVRO_TEST_SCHEMA, strict=True), {"name": "foo"}), + (ProtobufSerializer(Test), {"bar": 3}), ], ) def test_serialize_error(self, serializer: Serializer, value): @@ -151,6 +175,36 @@ class TestDeserializers: b"\x06foo\x00", {"name": "foo", "id": 0}, ), + ( + ProtobufDeserializer(Test), + b"\n\x03foo\x10\x02", + {"enum": "A", "name": "foo", "id": 2}, + ), + ( + ProtobufDeserializer(Test, to_dict=False), + b"\n\x03foo\x10\x02", + Test(name="foo", id=2), + ), + ( + ProtobufDeserializer(Test, use_integers_for_enums=True), + b"\n\x03foo\x10\x02", + {"enum": 0, "name": "foo", "id": 2}, + ), + ( + ProtobufDeserializer(Test), + b"\n\x03foo", + { + "enum": "A", + "name": "foo", + "id": 0, + }, + ), + ( + ProtobufDeserializer(Test), + b"\x10\x03", + {"enum": "A", "name": "", "id": 3}, + ), + (ProtobufDeserializer(Test), b"", {"enum": "A", "name": "", "id": 0}), ], ) def test_deserialize_no_column_name_success( @@ -176,6 +230,7 @@ def test_deserialize_no_column_name_success( b'{"id":10}', ), (AvroDeserializer(AVRO_TEST_SCHEMA), b"\x26foo\x00"), + (ProtobufDeserializer(Test), b"\n\x03foo\x10\x02\x13"), ], ) def test_deserialize_error(self, deserializer: Deserializer, value):