-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Adds ZipfileDecoder
component
#169
base: main
Are you sure you want to change the base?
Changes from all commits
e68f36f
a8a7bb3
254f877
8df239a
8c7d5f8
92574df
6e4b376
ab3f404
82a15c9
49d0ec8
96ec874
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from airbyte_cdk.sources.declarative.decoders.parsers.parsers import Parser, JsonParser | ||
|
||
__all__ = ["Parser", "JsonParser"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import json | ||
import logging | ||
from abc import abstractmethod | ||
from dataclasses import InitVar, dataclass | ||
from typing import Any, Generator, Mapping, MutableMapping, Union | ||
|
||
logger = logging.getLogger("airbyte") | ||
|
||
|
||
@dataclass | ||
class Parser: | ||
""" | ||
Parser strategy to convert str, bytes, or bytearray data into MutableMapping[str, Any]. | ||
""" | ||
|
||
@abstractmethod | ||
def parse( | ||
self, data: Union[str, bytes, bytearray] | ||
) -> Generator[MutableMapping[str, Any], None, None]: | ||
pass | ||
|
||
|
||
@dataclass | ||
class JsonParser(Parser): | ||
""" | ||
Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any]. | ||
""" | ||
|
||
parameters: InitVar[Mapping[str, Any]] | ||
|
||
def parse( | ||
self, data: Union[str, bytes, bytearray] | ||
) -> Generator[MutableMapping[str, Any], None, None]: | ||
try: | ||
body_json = json.loads(data) | ||
except json.JSONDecodeError: | ||
logger.warning(f"Data cannot be parsed into json: {data=}") | ||
yield {} | ||
|
||
if not isinstance(body_json, list): | ||
body_json = [body_json] | ||
if len(body_json) == 0: | ||
yield {} | ||
else: | ||
yield from body_json |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,54 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||||||||||||||||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||||||||||||||||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
import gzip | ||||||||||||||||||||||||||||||||||||||||||||||||
import io | ||||||||||||||||||||||||||||||||||||||||||||||||
import logging | ||||||||||||||||||||||||||||||||||||||||||||||||
import zipfile | ||||||||||||||||||||||||||||||||||||||||||||||||
from dataclasses import InitVar, dataclass | ||||||||||||||||||||||||||||||||||||||||||||||||
from typing import Any, Generator, Mapping, MutableMapping, Optional | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
import requests | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
from airbyte_cdk.sources.declarative.decoders import Decoder | ||||||||||||||||||||||||||||||||||||||||||||||||
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser, Parser | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
logger = logging.getLogger("airbyte") | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
@dataclass | ||||||||||||||||||||||||||||||||||||||||||||||||
class ZipfileDecoder(Decoder): | ||||||||||||||||||||||||||||||||||||||||||||||||
parameters: InitVar[Mapping[str, Any]] | ||||||||||||||||||||||||||||||||||||||||||||||||
parser: Optional[Parser] = None | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||
self._parser = ( | ||||||||||||||||||||||||||||||||||||||||||||||||
self.parser(parameters=parameters) if self.parser else JsonParser(parameters=parameters) | ||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+25
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Check if parser is already an instance? In the Consider modifying the initialization: def __post_init__(self, parameters: Mapping[str, Any]) -> None:
- self._parser = (
- self.parser(parameters=parameters) if self.parser else JsonParser(parameters=parameters)
- )
+ if isinstance(self.parser, Parser):
+ self._parser = self.parser
+ elif self.parser:
+ self._parser = self.parser(parameters=parameters)
+ else:
+ self._parser = JsonParser(parameters=parameters)
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The input is based on the |
||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
def is_stream_response(self) -> bool: | ||||||||||||||||||||||||||||||||||||||||||||||||
return False | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
def decode( | ||||||||||||||||||||||||||||||||||||||||||||||||
self, response: requests.Response | ||||||||||||||||||||||||||||||||||||||||||||||||
) -> Generator[MutableMapping[str, Any], None, None]: | ||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||
zip_file = zipfile.ZipFile(io.BytesIO(response.content)) | ||||||||||||||||||||||||||||||||||||||||||||||||
except zipfile.BadZipFile as e: | ||||||||||||||||||||||||||||||||||||||||||||||||
logger.exception(e) | ||||||||||||||||||||||||||||||||||||||||||||||||
logger.error( | ||||||||||||||||||||||||||||||||||||||||||||||||
f"Received an invalid zip file in response to URL: {response.request.url}. " | ||||||||||||||||||||||||||||||||||||||||||||||||
f"The size of the response body is: {len(response.content)}" | ||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||
yield {} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+36
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Return after handling a bad ZIP file exception? When a Here's a suggested change: except zipfile.BadZipFile as e:
logger.exception(e)
logger.error(
f"Received an invalid zip file in response to URL: {response.request.url}. "
f"The size of the response body is: {len(response.content)}"
)
yield {}
+ return 📝 Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No thanks |
||||||||||||||||||||||||||||||||||||||||||||||||
for gzip_filename in zip_file.namelist(): | ||||||||||||||||||||||||||||||||||||||||||||||||
with zip_file.open(gzip_filename) as file: | ||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||
for data in gzip.open(file): | ||||||||||||||||||||||||||||||||||||||||||||||||
yield from self._parser.parse(data) | ||||||||||||||||||||||||||||||||||||||||||||||||
except gzip.BadGzipFile as e: | ||||||||||||||||||||||||||||||||||||||||||||||||
logger.exception(e) | ||||||||||||||||||||||||||||||||||||||||||||||||
logger.error(f"Fail to read contents of zipped response: {e}") | ||||||||||||||||||||||||||||||||||||||||||||||||
yield {} | ||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+46
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Handle non-gzip files within the ZIP archive? I noticed that the code assumes each file inside the ZIP archive is gzip-compressed. Would it be beneficial to check the file type before attempting to decompress with Consider updating the code like this: for filename in zip_file.namelist():
with zip_file.open(filename) as file:
+ if filename.endswith('.gz'):
try:
+ with gzip.open(file) as decompressed_file:
+ data = decompressed_file.read()
+ yield from self._parser.parse(data)
except gzip.BadGzipFile as e:
logger.exception(e)
logger.error(f"Fail to read gzip file {filename}: {e}")
yield {}
+ else:
+ data = file.read()
+ yield from self._parser.parse(data) 📝 Committable suggestion
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import json | ||
|
||
import pytest | ||
|
||
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"raw_data, expected", | ||
[ | ||
(json.dumps({"data-type": "string"}), {"data-type": "string"}), | ||
(json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}), | ||
( | ||
bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), | ||
{"data-type": "bytearray"}, | ||
), | ||
(json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]), | ||
], | ||
ids=[ | ||
"test_with_str", | ||
"test_with_bytes", | ||
"test_with_bytearray", | ||
"test_with_string_data_containing_list", | ||
], | ||
) | ||
def test_json_parser_with_valid_data(raw_data, expected): | ||
for i, actual in enumerate(JsonParser().parse(raw_data)): | ||
if isinstance(expected, list): | ||
assert actual == expected[i] | ||
else: | ||
assert actual == expected |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
# | ||
import gzip | ||
import json | ||
import zipfile | ||
from io import BytesIO | ||
from typing import Union | ||
|
||
import pytest | ||
import requests | ||
|
||
from airbyte_cdk.sources.declarative.decoders import ZipfileDecoder | ||
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser | ||
|
||
|
||
def create_zip_from_dict(data: Union[dict, list]): | ||
zip_buffer = BytesIO() | ||
with zipfile.ZipFile(zip_buffer, mode="w") as zip_file: | ||
zip_file.writestr("data.json", data) | ||
zip_buffer.seek(0) | ||
return zip_buffer.getvalue() | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"json_data", | ||
[ | ||
{"test": "test"}, | ||
[{"id": 1}, {"id": 2}], | ||
], | ||
) | ||
def test_zipfile_decoder_with_valid_response(requests_mock, json_data): | ||
zipfile_decoder = ZipfileDecoder(parameters={}, parser=JsonParser) | ||
compressed_data = gzip.compress(json.dumps(json_data).encode()) | ||
zipped_data = create_zip_from_dict(compressed_data) | ||
requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data) | ||
response = requests.get("https://airbyte.io/") | ||
|
||
if isinstance(json_data, list): | ||
for i, actual in enumerate(zipfile_decoder.decode(response=response)): | ||
assert actual == json_data[i] | ||
else: | ||
assert next(zipfile_decoder.decode(response=response)) == json_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent NameError after JSON decoding fails?
When a
JSONDecodeError
occurs,body_json
isn't defined, but the code continues and may reference it later. Should we add areturn
statement after yielding{}
to avoid potentialNameError
exceptions? Wdyt?Consider updating the exception handling:
except json.JSONDecodeError: logger.warning(f"Data cannot be parsed into json: {data=}") yield {} + return
📝 Committable suggestion