Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds ZipfileDecoder component #169

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
50 changes: 50 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,56 @@ definitions:
$parameters:
type: object
additionalProperties: true
ZipfileDecoder:
title: Zipfile Decoder
description: Decoder for response data that is returned as zipfile(s).
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [ZipfileDecoder]
parser:
title: Parser
description: Parser to parse the decompressed data from the zipfile(s).
anyOf:
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/CustomParser"
JsonParser:
title: JsonParser
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [JsonParser]
CustomParser:
title: Custom Parser
description: Use this to implement custom parser logic.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomParser]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Parser. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_rivendell.components.ElvishParser"
$parameters:
type: object
additionalProperties: true
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder

__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", "ZipfileDecoder"]
7 changes: 7 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.decoders.parsers.parsers import Parser, JsonParser

__all__ = ["Parser", "JsonParser"]
49 changes: 49 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/parsers/parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import json
import logging
from abc import abstractmethod
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping, MutableMapping, Union

logger = logging.getLogger("airbyte")


@dataclass
class Parser:
"""
Parser strategy to convert str, bytes, or bytearray data into MutableMapping[str, Any].
"""

@abstractmethod
def parse(
self, data: Union[str, bytes, bytearray]
) -> Generator[MutableMapping[str, Any], None, None]:
pass


@dataclass
class JsonParser(Parser):
"""
Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any].
"""

parameters: InitVar[Mapping[str, Any]]

def parse(
self, data: Union[str, bytes, bytearray]
) -> Generator[MutableMapping[str, Any], None, None]:
try:
body_json = json.loads(data)
except json.JSONDecodeError:
logger.warning(f"Data cannot be parsed into json: {data=}")
yield {}
Comment on lines +38 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent NameError after JSON decoding fails?

When a JSONDecodeError occurs, body_json isn't defined, but the code continues and may reference it later. Should we add a return statement after yielding {} to avoid potential NameError exceptions? Wdyt?

Consider updating the exception handling:

            except json.JSONDecodeError:
                logger.warning(f"Data cannot be parsed into json: {data=}")
                yield {}
+               return
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
body_json = json.loads(data)
except json.JSONDecodeError:
logger.warning(f"Data cannot be parsed into json: {data=}")
yield {}
try:
body_json = json.loads(data)
except json.JSONDecodeError:
logger.warning(f"Data cannot be parsed into json: {data=}")
yield {}
return


if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
yield from body_json
54 changes: 54 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import gzip
import io
import logging
import zipfile
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping, MutableMapping, Optional

import requests

from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser, Parser

logger = logging.getLogger("airbyte")


@dataclass
class ZipfileDecoder(Decoder):
parameters: InitVar[Mapping[str, Any]]
parser: Optional[Parser] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parser = (
self.parser(parameters=parameters) if self.parser else JsonParser(parameters=parameters)
)
Comment on lines +25 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Check if parser is already an instance?

In the __post_init__ method, should we verify if self.parser is already an instance of Parser before attempting to instantiate it? This might prevent unintended behavior if an instance is passed instead of a class. Wdyt?

Consider modifying the initialization:

        def __post_init__(self, parameters: Mapping[str, Any]) -> None:
-           self._parser = (
-               self.parser(parameters=parameters) if self.parser else JsonParser(parameters=parameters)
-           )
+           if isinstance(self.parser, Parser):
+               self._parser = self.parser
+           elif self.parser:
+               self._parser = self.parser(parameters=parameters)
+           else:
+               self._parser = JsonParser(parameters=parameters)

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input is based on the model_to_component.py factory method and therefore we can assume it will be an uninstantiated parser.


def is_stream_response(self) -> bool:
return False

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
try:
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
except zipfile.BadZipFile as e:
logger.exception(e)
logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}. "
f"The size of the response body is: {len(response.content)}"
)
yield {}

Comment on lines +36 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Return after handling a bad ZIP file exception?

When a BadZipFile exception occurs, we're yielding an empty dictionary but continuing execution. Should we add a return statement after yielding to prevent further processing since the ZIP file is invalid? This might avoid unnecessary iterations. Wdyt?

Here's a suggested change:

            except zipfile.BadZipFile as e:
                logger.exception(e)
                logger.error(
                    f"Received an invalid zip file in response to URL: {response.request.url}. "
                    f"The size of the response body is: {len(response.content)}"
                )
                yield {}
+               return
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
except zipfile.BadZipFile as e:
logger.exception(e)
logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}. "
f"The size of the response body is: {len(response.content)}"
)
yield {}
try:
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
except zipfile.BadZipFile as e:
logger.exception(e)
logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}. "
f"The size of the response body is: {len(response.content)}"
)
yield {}
return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No thanks

for gzip_filename in zip_file.namelist():
with zip_file.open(gzip_filename) as file:
try:
for data in gzip.open(file):
yield from self._parser.parse(data)
except gzip.BadGzipFile as e:
logger.exception(e)
logger.error(f"Fail to read contents of zipped response: {e}")
yield {}
Comment on lines +46 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle non-gzip files within the ZIP archive?

I noticed that the code assumes each file inside the ZIP archive is gzip-compressed. Would it be beneficial to check the file type before attempting to decompress with gzip.open, in case some files aren't gzip-compressed? This could prevent potential errors when processing the ZIP contents. Wdyt?

Consider updating the code like this:

        for filename in zip_file.namelist():
            with zip_file.open(filename) as file:
+               if filename.endswith('.gz'):
                    try:
+                       with gzip.open(file) as decompressed_file:
+                           data = decompressed_file.read()
+                           yield from self._parser.parse(data)
                    except gzip.BadGzipFile as e:
                        logger.exception(e)
                        logger.error(f"Fail to read gzip file {filename}: {e}")
                        yield {}
+               else:
+                   data = file.read()
+                   yield from self._parser.parse(data)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for gzip_filename in zip_file.namelist():
with zip_file.open(gzip_filename) as file:
try:
for data in gzip.open(file):
yield from self._parser.parse(data)
except gzip.BadGzipFile as e:
logger.exception(e)
logger.error(f"Fail to read contents of zipped response: {e}")
yield {}
for filename in zip_file.namelist():
with zip_file.open(filename) as file:
if filename.endswith('.gz'):
try:
with gzip.open(file) as decompressed_file:
data = decompressed_file.read()
yield from self._parser.parse(data)
except gzip.BadGzipFile as e:
logger.exception(e)
logger.error(f"Fail to read gzip file {filename}: {e}")
yield {}
else:
data = file.read()
yield from self._parser.parse(data)

Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,27 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class JsonParser(BaseModel):
class Config:
extra = Extra.allow

type: Literal["JsonParser"]


class CustomParser(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomParser"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Parser. The format is `source_<name>.<package>.<class_name>`.",
examples=["source_rivendell.components.ElvishParser"],
title="Class Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class MinMaxDatetime(BaseModel):
type: Literal["MinMaxDatetime"]
datetime: str = Field(
Expand Down Expand Up @@ -1468,6 +1489,18 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
)


class ZipfileDecoder(BaseModel):
class Config:
extra = Extra.allow

type: Literal["ZipfileDecoder"]
parser: Optional[Union[JsonParser, CustomParser]] = Field(
None,
description="Parser to parse the decompressed data from the zipfile(s).",
title="Parser",
)


class ListPartitionRouter(BaseModel):
type: Literal["ListPartitionRouter"]
cursor_field: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
JsonlDecoder,
PaginationDecoderDecorator,
XmlDecoder,
ZipfileDecoder,
)
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser
from airbyte_cdk.sources.declarative.extractors import (
DpathExtractor,
RecordFilter,
Expand Down Expand Up @@ -224,6 +226,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonParser as JsonParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtAuthenticator as JwtAuthenticatorModel,
)
Expand Down Expand Up @@ -316,6 +321,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
XmlDecoder as XmlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ZipfileDecoder as ZipfileDecoderModel,
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
Expand Down Expand Up @@ -470,6 +478,7 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonParserModel: self.create_json_parser,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
IterableDecoderModel: self.create_iterable_decoder,
Expand Down Expand Up @@ -505,6 +514,7 @@ def _init_mappings(self) -> None:
ConfigComponentsResolverModel: self.create_config_components_resolver,
StreamConfigModel: self.create_stream_config,
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
ZipfileDecoderModel: self.create_zipfile_decoder,
}

# Needed for the case where we need to perform a second parse on the fields of a custom component
Expand Down Expand Up @@ -1682,6 +1692,20 @@ def create_gzipjson_decoder(
) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

def create_zipfile_decoder(
self, model: ZipfileDecoderModel, config: Config, **kwargs: Any
) -> ZipfileDecoder:
parser = (
self._create_component_from_model(model=model.parser, config=config)
if model.parser
else None
)
return ZipfileDecoder(parameters={}, parser=parser)

@staticmethod
def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser:
return JsonParser(parameters={})

@staticmethod
def create_json_file_schema_loader(
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any
Expand Down
3 changes: 3 additions & 0 deletions unit_tests/sources/declarative/decoders/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
35 changes: 35 additions & 0 deletions unit_tests/sources/declarative/decoders/parsers/test_parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import json

import pytest

from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser


@pytest.mark.parametrize(
"raw_data, expected",
[
(json.dumps({"data-type": "string"}), {"data-type": "string"}),
(json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}),
(
bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")),
{"data-type": "bytearray"},
),
(json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]),
],
ids=[
"test_with_str",
"test_with_bytes",
"test_with_bytearray",
"test_with_string_data_containing_list",
],
)
def test_json_parser_with_valid_data(raw_data, expected):
for i, actual in enumerate(JsonParser().parse(raw_data)):
if isinstance(expected, list):
assert actual == expected[i]
else:
assert actual == expected
43 changes: 43 additions & 0 deletions unit_tests/sources/declarative/decoders/test_zipfile_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import gzip
import json
import zipfile
from io import BytesIO
from typing import Union

import pytest
import requests

from airbyte_cdk.sources.declarative.decoders import ZipfileDecoder
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser


def create_zip_from_dict(data: Union[dict, list]):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, mode="w") as zip_file:
zip_file.writestr("data.json", data)
zip_buffer.seek(0)
return zip_buffer.getvalue()


@pytest.mark.parametrize(
"json_data",
[
{"test": "test"},
[{"id": 1}, {"id": 2}],
],
)
def test_zipfile_decoder_with_valid_response(requests_mock, json_data):
zipfile_decoder = ZipfileDecoder(parameters={}, parser=JsonParser)
compressed_data = gzip.compress(json.dumps(json_data).encode())
zipped_data = create_zip_from_dict(compressed_data)
requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data)
response = requests.get("https://airbyte.io/")

if isinstance(json_data, list):
for i, actual in enumerate(zipfile_decoder.decode(response=response)):
assert actual == json_data[i]
else:
assert next(zipfile_decoder.decode(response=response)) == json_data
Loading