Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading and writing avro files #367

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions Projectfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ python.add_requirements(
'cookiecutter >=1.5,<1.6',
'pytest-timeout >=1,<2',
'sphinx-sitemap >=0.2,<0.3',
'fastavro>=0.22.9',
],
docker=[
'bonobo-docker ~=0.6.0a1',
Expand Down
2 changes: 2 additions & 0 deletions bonobo/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def open_fs(fs_url=None, *args, **kwargs):

# standard transformations
api.register_group(
AvroReader,
AvroWriter,
CsvReader,
CsvWriter,
FileReader,
Expand Down
3 changes: 3 additions & 0 deletions bonobo/nodes/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .file import FileReader, FileWriter
from .json import JsonReader, JsonWriter, LdjsonReader, LdjsonWriter
from .pickle import PickleReader, PickleWriter
from .avro import AvroReader, AvroWriter

__all__ = [
"CsvReader",
Expand All @@ -16,4 +17,6 @@
"LdjsonWriter",
"PickleReader",
"PickleWriter",
"AvroReader",
"AvroWriter",
]
205 changes: 205 additions & 0 deletions bonobo/nodes/io/avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
from datetime import datetime, date, timedelta, time
from decimal import Decimal

from bonobo.config import Option, use_context
from bonobo.nodes.io.base import FileHandler
from bonobo.nodes.io.file import FileReader, FileWriter

# avro support is optional and should'nt impact when lib is not installed
try:
import fastavro
except ImportError:
pass

# Helpers

def assure_same_len(props, values):
if len(props) != len(values):
m = "Values length differs from input fields length. Expected: {}. Got: {}. Values: {!r}."
f = m.format(len(props), len(values), values)
raise ValueError(f)


def build_schema_from_values(props, values):
# https://avro.apache.org/docs/current/spec.html#schema_primitive
assure_same_len(props, values)
schema_fields = []
for p, v in zip(props, values):
if isinstance(v, int):
f = {'name': p, 'type': 'long'}
elif isinstance(v, bool):
f = {'name': p, 'type': 'boolean'}
elif isinstance(v, float):
f = {'name': p, 'type': 'double'}
elif isinstance(v, date):
f = {'name': p, 'type': 'int', 'logicalType': 'date'}
elif isinstance(v, timedelta) or isinstance(v, time):
f = {'name': p, 'type': 'long', 'logicalType': 'time-micros'}
elif isinstance(v, datetime):
f = {'name': p, 'type': 'long', 'logicalType': 'timestamp-micros'}
elif isinstance(v, Decimal):
f = {'name': p, 'type': 'double'}
elif isinstance(v, bytes):
f = {'name': p, 'type': 'bytes'}
else:
f = {'name': p, 'type': 'string'}
schema_fields.append(f)
return schema_fields


def get_same_value(value):
return value


def get_value_as_date(value):
diff = value - date(1970,1,1)
return diff.days


def get_value_as_datetime(value):
elapsed = value.timestamp()
return int(elapsed)


def get_value_as_timedelta(value):
elapsed = (value.days * 86400000) + (value.seconds * 1000) + value.microseconds
return elapsed


def get_value_as_time(value):
elapsed = (value.hour * 3600000) + (value.minute * 60000) + (value.second * 1000) + value.microsecond
return elapsed


def get_value_as_float(value):
return float(value)


def build_schema_from(props, values):
schema_fields = build_schema_from_values(props, values)
schema = {
'type': 'record',
'name': 'output',
'namespace': "avro",
'doc': "generated by bonobo",
'fields': schema_fields,
}
return schema


def build_converters_from_values(values):
converters = []
for v in values:
if isinstance(v, datetime):
f = get_value_as_datetime
elif isinstance(v, time):
f = get_value_as_time
elif isinstance(v, timedelta):
f = get_value_as_timedelta
elif isinstance(v, date):
f = get_value_as_date
elif isinstance(v, Decimal):
f = get_value_as_float
else:
f = get_same_value
converters.append(f)
return converters


# Exported classes


class AvroHandler(FileHandler):
schema = Option(tuple, required=False,
__doc__="A dict specifying the schema fields acording avro spec.\
```\
schema = {\
'doc': 'A weather reading.',\
'name': 'Weather',\
'namespace': 'test',\
'type': 'record',\
'fields': [\
{'name': 'station', 'type': 'string'},\
{'name': 'date', 'type': 'int', 'logicalType': 'date'},\
{'name': 'time', 'type': 'long', 'logicalType': 'time-micros'},\
{'name': 'temp', 'type': 'int'},\
],\
}\
```\
See: https://avro.apache.org/docs/current/spec.html#schema_primitive\
https://avro.apache.org/docs/current/spec.html#Logical+Types")

codec = Option(str, required=False, default="null",
__doc__="The name of the compression codec used to compress blocks.\
Compression codec can be ‘null’, ‘deflate’ or ‘snappy’ (if installed)")


@use_context
class AvroReader(FileReader, AvroHandler):
"""
Reads the records from a avro file and yields the values in dicts.
"""
mode = Option(str, default="rb")

def read(self, file, context, *, fs):
avro_reader = fastavro.reader(file)
if not context.output_type:
file_schema = avro_reader.writer_schema
schema_fields = file_schema['fields']
field_names = [col['name'] for col in schema_fields]
col_names = tuple(field_names)
context.set_output_fields(col_names)
self.schema = file_schema
self.codec = avro_reader.codec

for record in avro_reader:
row = tuple(record.values())
yield row

__call__ = read


@use_context
class AvroWriter(FileWriter, AvroHandler):
"""
Writes the values as records into a avro file according to the fields defined in schema

When the schema is not specified, it tries to guess types from the values
of the fields present in the first record.
The type of values written follow the ones of python type system. Take
care when writing data extracted from sql databases as their types are
usually affected by factors like driver issues, type mismatch, incorrect
mapping, precision loss (specially float and decimals), SQLArchitect...
"""
compression_level = Option(int, required=False,
__doc__="Compression level to use when the specified codec supports it")

mode = Option(str, default="wb+")

def write(self, file, context, *values, fs):
"""
Write a record to the opened file using the defined schema
"""
context.setdefault("schema", None)
context.setdefault("converters", None)

props = context.get_input_fields()
if not context.schema:
detected = build_schema_from(props, values)
parsed = fastavro.parse_schema(detected)
context.schema = parsed
if not context.converters:
context.converters = build_converters_from_values(values)
row = {k: conv(v) for k, v, conv in zip(props, values, context.converters)}
one_record = [row]
fastavro.writer(
fo=file,
schema=context.schema,
records=one_record,
codec=self.codec,
codec_compression_level=self.compression_level
)

__call__ = write

# end of file #
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ click==7.0
cookiecutter==1.5.1
coverage==4.5.3
docutils==0.14
fastavro>=0.22.9
future==0.17.1
idna==2.8
imagesize==1.1.0
Expand Down
105 changes: 105 additions & 0 deletions tests/nodes/io/test_avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import pytest
from datetime import datetime, date, timedelta

from bonobo import AvroReader, AvroWriter
from bonobo.constants import EMPTY
from bonobo.execution.contexts.node import NodeExecutionContext
from bonobo.util.testing import BufferingNodeExecutionContext, FilesystemTester

avro_tester = FilesystemTester("avro", mode="wb")


def is_fastavro_missing():
try:
import fastavro
return False
except ImportError:
return True


def test_write_records_to_avro_file(tmpdir):
if is_fastavro_missing():
return
fs, filename, services = avro_tester.get_services_for_writer(tmpdir)
writav = AvroWriter(
filename,
codec = 'deflate',
compression_level = 6
)
john = ("john", 7, 0.7, False, date(2012,10,11), datetime(2018,9,14,15,16,17))
jane = ("jane", 17, 1.7, True, date(2002,11,12), datetime(2015,12,13,14,15,16))
jack = ("jack", 27, 2.7, True, date(1992,12,13), datetime(2010,11,12,13,14,15))
joel = ("joel", 37, 3.7, True, date(1982,12,25), datetime(2009,10,11,12,13,14))
with NodeExecutionContext(writav, services=services) as context:
context.set_input_fields(["name", "age", "score", "smart", "birthday", "registered"])
context.write_sync(john, jane, jack, joel)


def test_write_with_schema_to_avro_file(tmpdir):
if is_fastavro_missing():
return
fs, filename, services = avro_tester.get_services_for_writer(tmpdir)
custom_schema = {
'doc': 'Some random people.',
'name': 'Crowd',
'namespace': 'test',
'type': 'record',
'fields': [
{'name': 'pete', 'type': 'string'},
{'name': 'age', 'type': 'int'},
{'name': 'birthday', 'type': 'int', 'logicalType': 'date'},
{'name': 'registered', 'type': 'long', 'logicalType': 'timestamp-micros'},
],
}
writav = AvroWriter(
filename,
schema = custom_schema,
codec = 'deflate',
compression_level = 6
)
pete = ("pete", 7, date(2012,10,11), datetime(2018,9,14,15,16,17))
mike = ("mike", 17, date(2002,11,12), datetime(2015,12,13,14,15,16))
zack = ("zack", 27, date(1992,12,13), datetime(2010,11,12,13,14,15))
gene = ("gene", 37, date(1982,12,25), datetime(2009,10,11,12,13,14))
with NodeExecutionContext(writav, services=services) as context:
context.set_input_fields(["name", "age", "birthday", "registered"])
context.write_sync(pete, mike, zack, gene)


def create_avro_example(path):
import fastavro
schema = {
'doc': 'A weather reading.',
'name': 'Weather',
'namespace': 'test',
'type': 'record',
'fields': [
{'name': 'station', 'type': 'string'},
{'name': 'time', 'type': 'long'},
{'name': 'temp', 'type': 'int'},
],
}
parsed_schema = fastavro.parse_schema(schema)
records = [
{u'station': u'cold', u'temp': 0, u'time': 1433269388},
{u'station': u'warm', u'temp': 22, u'time': 1433270389},
{u'station': u'frozen', u'temp': -11, u'time': 1433273379},
{u'station': u'hot', u'temp': 111, u'time': 1433275478},
]
with open(path, 'wb') as out:
fastavro.writer(out, parsed_schema, records)


def test_read_records_from_avro_file(tmpdir):
if is_fastavro_missing():
return
dst = tmpdir.strpath + '/output.avro'
create_avro_example(dst)
fs, filename, services = avro_tester.get_services_for_writer(tmpdir)
readav = AvroReader(filename)
with BufferingNodeExecutionContext(readav, services=services) as context:
context.write_sync(EMPTY)
props = context.get_output_fields()
assert props == ("station", "time", "temp")

# end of test file