Skip to content

Commit

Permalink
Avro serdes support
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix committed Jul 22, 2024
1 parent 8b20c0c commit a31f2e0
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "quixstreams"
dynamic = ["version", "dependencies", "optional-dependencies"]
dynamic = ["version", "dependencies"]
description = "Python library for building stream processing applications with Apache Kafka"
license = {file = "LICENSE"}
readme = "README.md"
Expand All @@ -26,6 +26,13 @@ classifiers = [
[project.urls]
Homepage = "https://github.com/quixio/quix-streams"

[project.optional-dependencies]
all = [
"fastavro"
]

avro = ["fastavro"]

[tool.setuptools.packages.find]
include = ["quixstreams*"]
exclude = ["tests*", "docs*", "examples*"]
Expand Down
58 changes: 58 additions & 0 deletions quixstreams/models/serializers/avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Union, Mapping, Optional, Any, Iterable

from io import BytesIO

from fastavro import schemaless_reader, schemaless_writer, parse_schema
from fastavro.types import Schema

from .base import Serializer, Deserializer, SerializationContext
from .exceptions import SerializationError

__all__ = ("AvroSerializer", "AvroDeserializer")


class AvroSerializer(Serializer):
def __init__(self, schema: Schema, **kwargs):
"""
Serializer that returns data in Avro format.
Additional keyword arguments are passed directly to fastavro `schemaless_writer` method.
:param schema: The avro schema.
"""
self._schema = parse_schema(schema)
self._kwargs = kwargs or {}

def __call__(self, value: Any, ctx: SerializationContext) -> bytes:
data = BytesIO()

try:
schemaless_writer(data, self._schema, value, self._kwargs)
except ValueError as exc:
raise SerializationError(str(exc)) from exc

result = data.getvalue()
data.close()
return result


class AvroDeserializer(Deserializer):
def __init__(self, schema: Schema, **kwargs):
"""
Deserializer that parses data from Avro.
Additional keyword arguments are passed directly to fastavro `schemaless_reader` method.
:param schema: The Avro schema.
"""
super().__init__()
self._schema = parse_schema(schema)
self._kwargs = kwargs or {}

def __call__(
self, value: bytes, ctx: SerializationContext
) -> Union[Iterable[Mapping], Mapping]:
try:
return schemaless_reader(BytesIO(value), self._schema, **self._kwargs)
except EOFError as exc:
raise SerializationError(str(exc)) from exc
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ testcontainers==4.5.1; python_version >= '3.9'
pytest
requests>=2.32
docker>=7.1.0 # Required to use requests>=2.32
fastavro
31 changes: 31 additions & 0 deletions tests/test_quixstreams/test_models/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
)
from .utils import int_to_bytes, float_to_bytes

from quixstreams.models.serializers.avro import AvroDeserializer, AvroSerializer

AVRO_TEST_SCHEMA = {
"type": "record",
"name": "testschema",
"fields": [
{"name": "name", "type": "string"},
{"name": "id", "type": "int", "default": 0},
],
}

dummy_context = SerializationContext(topic="topic")


Expand All @@ -34,6 +45,12 @@ class TestSerializers:
(BytesSerializer(), b"abc", b"abc"),
(JSONSerializer(), {"a": 123}, b'{"a":123}'),
(JSONSerializer(), [1, 2, 3], b"[1,2,3]"),
(
AvroSerializer(AVRO_TEST_SCHEMA),
{"name": "foo", "id": 123},
b"\x06foo\xf6\x01",
),
(AvroSerializer(AVRO_TEST_SCHEMA), {"name": "foo"}, b"\x06foo\x00"),
],
)
def test_serialize_success(self, serializer: Serializer, value, expected):
Expand All @@ -50,6 +67,9 @@ def test_serialize_success(self, serializer: Serializer, value, expected):
(StringSerializer(), {"a": 123}),
(JSONSerializer(), object()),
(JSONSerializer(), complex(1, 2)),
(AvroSerializer(AVRO_TEST_SCHEMA), {"foo": "foo", "id": 123}),
(AvroSerializer(AVRO_TEST_SCHEMA), {"id": 123}),
(AvroSerializer(AVRO_TEST_SCHEMA, strict=True), {"name": "foo"}),
],
)
def test_serialize_error(self, serializer: Serializer, value):
Expand All @@ -70,6 +90,16 @@ class TestDeserializers:
(BytesDeserializer(), b"123123", b"123123"),
(JSONDeserializer(), b"123123", 123123),
(JSONDeserializer(), b'{"a":"b"}', {"a": "b"}),
(
AvroDeserializer(AVRO_TEST_SCHEMA),
b"\x06foo\xf6\x01",
{"name": "foo", "id": 123},
),
(
AvroDeserializer(AVRO_TEST_SCHEMA),
b"\x06foo\x00",
{"name": "foo", "id": 0},
),
],
)
def test_deserialize_no_column_name_success(
Expand All @@ -84,6 +114,7 @@ def test_deserialize_no_column_name_success(
(IntegerDeserializer(), b'{"abc": "abc"}'),
(DoubleDeserializer(), b"abc"),
(JSONDeserializer(), b"{"),
(AvroDeserializer(AVRO_TEST_SCHEMA), b"\x26foo\x00"),
],
)
def test_deserialize_error(self, deserializer: Deserializer, value):
Expand Down

0 comments on commit a31f2e0

Please sign in to comment.