Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro serdes support #407

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions LICENSES/LICENSE.fastavro
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2011 Miki Tebeka

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
29 changes: 29 additions & 0 deletions docs/advanced/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Quix Streams supports multiple serialization formats to exchange data between Ka
- `integer`
- `double`
- `json`
- `avro`

The serialization settings are defined per-topic using these parameters of `Application.topic()` function:

Expand Down Expand Up @@ -72,3 +73,31 @@ app = Application(broker_address='localhost:9092', consumer_group='consumer')
input_topic = app.topic('input', value_deserializer=JSONDeserializer(schema=MY_SCHEMA))
output_topic = app.topic('output', value_serializer=JSONSerializer(schema=MY_SCHEMA))
```

## Avro

daniil-quix marked this conversation as resolved.
Show resolved Hide resolved
The Avro serializer and deserializer need to be passed explicitly.

In the current version, the schema must be provided manually.

> ***WARNING***: The avro serializer and deserializer require the `fastavro` library.
daniil-quix marked this conversation as resolved.
Show resolved Hide resolved
> You can install quixstreams with the necessary dependencies using
> `pip install quixstreams[avro]`

```python
from quixstreams import Application
from quixstreams.models.serialize.avro import AvroSerializer, AvroDeserializer

MY_SCHEMA = {
"type": "record",
"name": "testschema",
"fields": [
{"name": "name", "type": "string"},
{"name": "id", "type": "int", "default": 0},
],
}

app = Application(broker_address='localhost:9092', consumer_group='consumer')
input_topic = app.topic('input', value_deserializer=AvroDeserializer(schema=MY_SCHEMA))
output_topic = app.topic('output', value_serializer=AvroSerializer(schema=MY_SCHEMA))
```
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "quixstreams"
dynamic = ["version", "dependencies", "optional-dependencies"]
dynamic = ["version", "dependencies"]
description = "Python library for building stream processing applications with Apache Kafka"
license = {file = "LICENSE"}
readme = "README.md"
Expand All @@ -26,6 +26,13 @@ classifiers = [
[project.urls]
Homepage = "https://github.com/quixio/quix-streams"

[project.optional-dependencies]
all = [
"fastavro>=1.8,<2.0"
]

avro = ["fastavro>=1.8,<2.0"]

[tool.setuptools.packages.find]
include = ["quixstreams*"]
exclude = ["tests*", "docs*", "examples*"]
Expand Down
113 changes: 113 additions & 0 deletions quixstreams/models/serializers/avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from typing import Union, Mapping, Optional, Any, Iterable

from io import BytesIO

from fastavro import schemaless_reader, schemaless_writer, parse_schema
from fastavro.types import Schema

from .base import Serializer, Deserializer, SerializationContext
from .exceptions import SerializationError

__all__ = ("AvroSerializer", "AvroDeserializer")


class AvroSerializer(Serializer):
def __init__(
self,
schema: Schema,
strict: bool = False,
strict_allow_default: bool = False,
disable_tuple_notation: bool = False,
):
"""
Serializer that returns data in Avro format.

For more information see fastavro [schemaless_writer](https://fastavro.readthedocs.io/en/latest/writer.html#fastavro._write_py.schemaless_writer) method.

:param schema: The avro schema.
:param strict: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states.
Default - `False`
:param strict_allow_default: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states unless it is a missing field that has a default value in the schema.
Default - `False`
:param disable_tuple_notation: If set to True, tuples will not be treated as a special case. Therefore, using a tuple to indicate the type of a record will not work.
Default - `False`
"""
self._schema = parse_schema(schema)
self._strict = strict
self._strict_allow_default = strict_allow_default
self._disable_tuple_notation = disable_tuple_notation

def __call__(self, value: Any, ctx: SerializationContext) -> bytes:
data = BytesIO()

with BytesIO() as data:
try:
schemaless_writer(
data,
self._schema,
value,
strict=self._strict,
strict_allow_default=self._strict_allow_default,
disable_tuple_notation=self._disable_tuple_notation,
)
except ValueError as exc:
raise SerializationError(str(exc)) from exc

return data.getvalue()


class AvroDeserializer(Deserializer):
def __init__(
self,
schema: Schema,
reader_schema: Optional[Schema] = None,
return_record_name: bool = False,
return_record_name_override: bool = False,
return_named_type: bool = False,
return_named_type_override: bool = False,
handle_unicode_errors: str = "strict",
quentin-quix marked this conversation as resolved.
Show resolved Hide resolved
):
"""
Deserializer that parses data from Avro.

For more information see fastavro [schemaless_reader](https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader) method.

:param schema: The Avro schema.
:param reader_schema: If the schema has changed since being written then the new schema can be given to allow for schema migration.
Default - `None`
:param return_record_name: If true, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself.
Default - `False`
:param return_record_name_override: If true, this will modify the behavior of return_record_name so that the record name is only returned for unions where there is more than one record. For unions that only have one record, this option will make it so that the record is returned by itself, not a tuple with the name.
Default - `False`
:param return_named_type: If true, when reading a union of named types, the result will be a tuple where the first value is the name of the type and the second value is the record itself NOTE: Using this option will ignore return_record_name and return_record_name_override.
Default - `False`
:param return_named_type_override: If true, this will modify the behavior of return_named_type so that the named type is only returned for unions where there is more than one named type. For unions that only have one named type, this option will make it so that the named type is returned by itself, not a tuple with the name.
Default - `False`
:param handle_unicode_errors: Should be set to a valid string that can be used in the errors argument of the string decode() function.
Default - `"strict"`
"""
super().__init__()
self._schema = parse_schema(schema)
self._reader_schema = parse_schema(reader_schema) if reader_schema else None
self._return_record_name = return_record_name
self._return_record_name_override = return_record_name_override
self._return_named_type = return_named_type
self._return_named_type_override = return_named_type_override
self._handle_unicode_errors = handle_unicode_errors

def __call__(
self, value: bytes, ctx: SerializationContext
) -> Union[Iterable[Mapping], Mapping]:
try:
return schemaless_reader(
BytesIO(value),
self._schema,
reader_schema=self._reader_schema,
return_record_name=self._return_record_name,
return_record_name_override=self._return_record_name_override,
return_named_type=self._return_named_type,
return_named_type_override=self._return_named_type_override,
handle_unicode_errors=self._handle_unicode_errors,
)
except EOFError as exc:
raise SerializationError(str(exc)) from exc
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ testcontainers==4.5.1; python_version >= '3.9'
pytest
requests>=2.32
docker>=7.1.0 # Required to use requests>=2.32
fastavro>=1.8,<2.0
31 changes: 31 additions & 0 deletions tests/test_quixstreams/test_models/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@
)
from .utils import int_to_bytes, float_to_bytes

from quixstreams.models.serializers.avro import AvroDeserializer, AvroSerializer

AVRO_TEST_SCHEMA = {
"type": "record",
"name": "testschema",
"fields": [
{"name": "name", "type": "string"},
{"name": "id", "type": "int", "default": 0},
],
}

dummy_context = SerializationContext(topic="topic")

JSONSCHEMA_TEST_SCHEMA = {
Expand Down Expand Up @@ -57,6 +68,12 @@ class TestSerializers:
{"id": 10, "name": "foo"},
b'{"id":10,"name":"foo"}',
),
(
AvroSerializer(AVRO_TEST_SCHEMA),
{"name": "foo", "id": 123},
b"\x06foo\xf6\x01",
),
(AvroSerializer(AVRO_TEST_SCHEMA), {"name": "foo"}, b"\x06foo\x00"),
],
)
def test_serialize_success(self, serializer: Serializer, value, expected):
Expand All @@ -83,6 +100,9 @@ def test_serialize_success(self, serializer: Serializer, value, expected):
),
{"id": 10},
),
(AvroSerializer(AVRO_TEST_SCHEMA), {"foo": "foo", "id": 123}),
(AvroSerializer(AVRO_TEST_SCHEMA), {"id": 123}),
(AvroSerializer(AVRO_TEST_SCHEMA, strict=True), {"name": "foo"}),
],
)
def test_serialize_error(self, serializer: Serializer, value):
Expand Down Expand Up @@ -121,6 +141,16 @@ class TestDeserializers:
b'{"id":10,"name":"foo"}',
{"id": 10, "name": "foo"},
),
(
AvroDeserializer(AVRO_TEST_SCHEMA),
b"\x06foo\xf6\x01",
{"name": "foo", "id": 123},
),
(
AvroDeserializer(AVRO_TEST_SCHEMA),
b"\x06foo\x00",
{"name": "foo", "id": 0},
),
],
)
def test_deserialize_no_column_name_success(
Expand All @@ -145,6 +175,7 @@ def test_deserialize_no_column_name_success(
),
b'{"id":10}',
),
(AvroDeserializer(AVRO_TEST_SCHEMA), b"\x26foo\x00"),
],
)
def test_deserialize_error(self, deserializer: Deserializer, value):
Expand Down
Loading