Skip to content

Commit

Permalink
feat: add download_decoder + download_extractor (#50)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <[email protected]>
  • Loading branch information
artem1205 authored Dec 3, 2024
1 parent 9587d4e commit c90eea1
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 8 deletions.
29 changes: 29 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
ResponseToFileExtractor:
title: CSV To File Extractor
description: A record extractor designed for handling large responses that may exceed memory limits (to prevent OOM issues). It downloads a CSV file to disk, reads the data from disk, and deletes the file once it has been fully processed.
type: object
required:
- type
properties:
type:
type: string
enum: [ResponseToFileExtractor]
$parameters:
type: object
additionalProperties: true
ExponentialBackoffStrategy:
title: Exponential Backoff
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
Expand Down Expand Up @@ -2680,6 +2693,12 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
download_extractor:
description: Responsible for fetching the records from provided urls.
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/ResponseToFileExtractor"
creation_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.
anyOf:
Expand Down Expand Up @@ -2734,6 +2753,16 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
download_decoder:
title: Download Decoder
description: Component decoding the download response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import uuid
import zlib
from contextlib import closing
from dataclasses import InitVar, dataclass
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple

import pandas as pd
Expand All @@ -19,6 +20,7 @@
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10


@dataclass
class ResponseToFileExtractor(RecordExtractor):
"""
This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as
Expand All @@ -28,7 +30,9 @@ class ResponseToFileExtractor(RecordExtractor):
a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
"""

def __init__(self) -> None:
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.logger = logging.getLogger("airbyte")

def _get_response_encoding(self, headers: Dict[str, Any]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,11 @@ class DpathExtractor(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ResponseToFileExtractor(BaseModel):
type: Literal["ResponseToFileExtractor"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ExponentialBackoffStrategy(BaseModel):
type: Literal["ExponentialBackoffStrategy"]
factor: Optional[Union[float, str]] = Field(
Expand Down Expand Up @@ -1798,6 +1803,9 @@ class AsyncRetriever(BaseModel):
...,
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
)
download_extractor: Optional[
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
] = Field(None, description="Responsible for fetching the records from provided urls.")
creation_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down Expand Up @@ -1848,6 +1856,20 @@ class AsyncRetriever(BaseModel):
description="Component decoding the response so records can be extracted.",
title="Decoder",
)
download_decoder: Optional[
Union[
CustomDecoder,
JsonDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
]
] = Field(
None,
description="Component decoding the download response so records can be extracted.",
title="Download Decoder",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
RequestPath as RequestPathModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ResponseToFileExtractor as ResponseToFileExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SelectiveAuthenticator as SelectiveAuthenticatorModel,
)
Expand Down Expand Up @@ -427,6 +430,7 @@ def _init_mappings(self) -> None:
DefaultErrorHandlerModel: self.create_default_error_handler,
DefaultPaginatorModel: self.create_default_paginator,
DpathExtractorModel: self.create_dpath_extractor,
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
HttpRequesterModel: self.create_http_requester,
Expand Down Expand Up @@ -1447,6 +1451,13 @@ def create_dpath_extractor(
parameters=model.parameters or {},
)

def create_response_to_file_extractor(
self,
model: ResponseToFileExtractorModel,
**kwargs: Any,
) -> ResponseToFileExtractor:
return ResponseToFileExtractor(parameters=model.parameters or {})

@staticmethod
def create_exponential_backoff_strategy(
model: ExponentialBackoffStrategyModel, config: Config
Expand Down Expand Up @@ -2011,6 +2022,7 @@ def create_async_retriever(
model=model.record_selector,
config=config,
decoder=decoder,
name=name,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
)
Expand All @@ -2028,16 +2040,36 @@ def create_async_retriever(
name=f"job polling - {name}",
)
job_download_components_name = f"job download - {name}"
download_decoder = (
self._create_component_from_model(model=model.download_decoder, config=config)
if model.download_decoder
else JsonDecoder(parameters={})
)
download_extractor = (
self._create_component_from_model(
model=model.download_extractor,
config=config,
decoder=download_decoder,
parameters=model.parameters,
)
if model.download_extractor
else DpathExtractor(
[],
config=config,
decoder=download_decoder,
parameters=model.parameters or {},
)
)
download_requester = self._create_component_from_model(
model=model.download_requester,
decoder=decoder,
decoder=download_decoder,
config=config,
name=job_download_components_name,
)
download_retriever = SimpleRetriever(
requester=download_requester,
record_selector=RecordSelector(
extractor=ResponseToFileExtractor(),
extractor=download_extractor,
name=name,
record_filter=None,
transformations=[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):

job_timeout: Optional[timedelta] = None
record_extractor: RecordExtractor = field(
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor()
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
)

def __post_init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class ResponseToFileExtractorTest(TestCase):
def setUp(self) -> None:
self._extractor = ResponseToFileExtractor()
self._extractor = ResponseToFileExtractor({})
self._http_mocker = requests_mock.Mocker()
self._http_mocker.__enter__()

Expand Down Expand Up @@ -76,7 +76,7 @@ def large_event_response_fixture():
@pytest.mark.limit_memory("20 MB")
def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response):
lines_in_response, file_path = large_events_response
extractor = ResponseToFileExtractor()
extractor = ResponseToFileExtractor({})

url = "https://for-all-mankind.nasa.com/api/v1/users/users1"
requests_mock.get(url, body=open(file_path, "rb"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

# mypy: ignore-errors
import datetime
from typing import Any, Mapping
from typing import Any, Iterable, Mapping

import freezegun
import pendulum
import pytest
import requests

from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.models import FailureType, Level
Expand All @@ -27,6 +28,7 @@
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
Expand All @@ -47,6 +49,9 @@
from airbyte_cdk.sources.declarative.models import (
CustomPartitionRouter as CustomPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models import (
CustomRecordExtractor as CustomRecordExtractorModel,
)
from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
Expand Down Expand Up @@ -3271,3 +3276,20 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
"state_type": "date-range",
"legacy": {},
}


class CustomRecordExtractor(RecordExtractor):
def extract_records(
self,
response: requests.Response,
) -> Iterable[Mapping[str, Any]]:
yield from response.json()


def test_create_custom_record_extractor():
definition = {
"type": "CustomRecordExtractor",
"class_name": "unit_tests.sources.declarative.parsers.test_model_to_component_factory.CustomRecordExtractor",
}
component = factory.create_component(CustomRecordExtractorModel, definition, {})
assert isinstance(component, CustomRecordExtractor)
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def setUp(self) -> None:
stream_response=True,
),
record_selector=RecordSelector(
extractor=ResponseToFileExtractor(),
extractor=ResponseToFileExtractor({}),
record_filter=None,
transformations=[],
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
Expand Down

0 comments on commit c90eea1

Please sign in to comment.