diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 3fcbbf34..3d378a28 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1750,6 +1750,45 @@ definitions: type: type: string enum: [XmlDecoder] + CustomDecoder: + title: Custom Decoder + description: Use this to implement custom decoder logic. + type: object + additionalProperties: true + required: + - type + - class_name + properties: + type: + type: string + enum: [CustomDecoder] + class_name: + title: Class Name + description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_..`. + type: string + additionalProperties: true + examples: + - "source_amazon_ads.components.GzipJsonlDecoder" + $parameters: + type: object + additionalProperties: true + GzipJsonDecoder: + title: GzipJson Decoder + description: Use this if the response is Gzip compressed Json. + type: object + additionalProperties: true + required: + - type + properties: + type: + type: string + enum: [GzipJsonDecoder] + encoding: + type: string + default: utf-8 + $parameters: + type: object + additionalProperties: true ListPartitionRouter: title: List Partition Router description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests. @@ -2404,10 +2443,12 @@ definitions: title: Decoder description: Component decoding the response so records can be extracted. anyOf: + - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/GzipJsonDecoder" $parameters: type: object additionalProperties: true @@ -2520,10 +2561,12 @@ definitions: title: Decoder description: Component decoding the response so records can be extracted. anyOf: + - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/GzipJsonDecoder" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index b67561e9..7452fe99 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -3,9 +3,9 @@ # from airbyte_cdk.sources.declarative.decoders.decoder import Decoder -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder, GzipJsonDecoder from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder -__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"] +__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"] diff --git a/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte_cdk/sources/declarative/decoders/json_decoder.py index 986bbd87..d04504af 100644 --- a/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -1,14 +1,15 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import codecs import logging from dataclasses import InitVar, dataclass -from typing import Any, Generator, Mapping +from gzip import decompress +from typing import Any, Generator, Mapping, MutableMapping, List, Optional import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder -from orjson import orjson +import orjson logger = logging.getLogger("airbyte") @@ -24,24 +25,32 @@ class JsonDecoder(Decoder): def is_stream_response(self) -> bool: return False - def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]: + def decode( + self, response: requests.Response + ) -> Generator[MutableMapping[str, Any], None, None]: """ Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. """ try: body_json = response.json() - if not isinstance(body_json, list): - body_json = [body_json] - if len(body_json) == 0: - yield {} - else: - yield from body_json + yield from self.parse_body_json(body_json) except requests.exceptions.JSONDecodeError: logger.warning( f"Response cannot be parsed into json: {response.status_code=}, {response.text=}" ) yield {} + @staticmethod + def parse_body_json( + body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]], + ) -> Generator[MutableMapping[str, Any], None, None]: + if not isinstance(body_json, list): + body_json = [body_json] + if len(body_json) == 0: + yield {} + else: + yield from body_json + @dataclass class IterableDecoder(Decoder): @@ -54,7 +63,9 @@ class IterableDecoder(Decoder): def is_stream_response(self) -> bool: return True - def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]: + def decode( + self, response: requests.Response + ) -> Generator[MutableMapping[str, Any], None, None]: for line in response.iter_lines(): yield {"record": line.decode()} @@ -70,8 +81,30 @@ class JsonlDecoder(Decoder): def is_stream_response(self) -> bool: return True - def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]: + def decode( + self, response: requests.Response + ) -> Generator[MutableMapping[str, Any], None, None]: # TODO???: set delimiter? usually it is `\n` but maybe it would be useful to set optional? # https://github.com/airbytehq/airbyte-internal-issues/issues/8436 for record in response.iter_lines(): yield orjson.loads(record) + + +@dataclass +class GzipJsonDecoder(JsonDecoder): + encoding: Optional[str] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + if self.encoding: + try: + codecs.lookup(self.encoding) + except LookupError: + raise ValueError( + f"Invalid encoding '{self.encoding}'. Please check provided encoding" + ) + + def decode( + self, response: requests.Response + ) -> Generator[MutableMapping[str, Any], None, None]: + raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8") + yield from self.parse_body_json(orjson.loads(raw_string)) diff --git a/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py b/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py index 38546168..8000b187 100644 --- a/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py +++ b/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py @@ -4,7 +4,11 @@ from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration -from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor, SubstreamPartitionRouter +from airbyte_cdk.sources.declarative.models import ( + DatetimeBasedCursor, + SubstreamPartitionRouter, + CustomIncrementalSync, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig @@ -32,7 +36,7 @@ class LegacyToPerPartitionStateMigration(StateMigration): def __init__( self, partition_router: SubstreamPartitionRouter, - cursor: DatetimeBasedCursor, + cursor: CustomIncrementalSync | DatetimeBasedCursor, config: Mapping[str, Any], parameters: Mapping[str, Any], ): @@ -64,7 +68,7 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: return False # There is exactly one parent stream - number_of_parent_streams = len(self._partition_router.parent_stream_configs) + number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed if number_of_parent_streams != 1: # There should be exactly one parent stream return False diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 43848eae..6b0f3ca9 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -687,6 +687,29 @@ class XmlDecoder(BaseModel): type: Literal["XmlDecoder"] +class CustomDecoder(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["CustomDecoder"] + class_name: str = Field( + ..., + description="Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_..`.", + examples=["source_amazon_ads.components.GzipJsonlDecoder"], + title="Class Name", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class GzipJsonDecoder(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["GzipJsonDecoder"] + encoding: Optional[str] = "utf-8" + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class MinMaxDatetime(BaseModel): type: Literal["MinMaxDatetime"] datetime: str = Field( @@ -1620,7 +1643,16 @@ class SimpleRetriever(BaseModel): description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.", title="Partition Router", ) - decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = Field( + decoder: Optional[ + Union[ + CustomDecoder, + JsonDecoder, + JsonlDecoder, + IterableDecoder, + XmlDecoder, + GzipJsonDecoder, + ] + ] = Field( None, description="Component decoding the response so records can be extracted.", title="Decoder", @@ -1680,7 +1712,16 @@ class AsyncRetriever(BaseModel): description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.", title="Partition Router", ) - decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = Field( + decoder: Optional[ + Union[ + CustomDecoder, + JsonDecoder, + JsonlDecoder, + IterableDecoder, + XmlDecoder, + GzipJsonDecoder, + ] + ] = Field( None, description="Component decoding the response so records can be extracted.", title="Decoder", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2812ba81..d2dd9d9d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -58,6 +58,7 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import ( Decoder, + GzipJsonDecoder, IterableDecoder, JsonDecoder, JsonlDecoder, @@ -134,6 +135,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomBackoffStrategy as CustomBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CustomDecoder as CustomDecoderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomErrorHandler as CustomErrorHandlerModel, ) @@ -182,6 +186,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + GzipJsonDecoder as GzipJsonDecoderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpRequester as HttpRequesterModel, ) @@ -402,6 +409,7 @@ def _init_mappings(self) -> None: CursorPaginationModel: self.create_cursor_pagination, CustomAuthenticatorModel: self.create_custom_component, CustomBackoffStrategyModel: self.create_custom_component, + CustomDecoderModel: self.create_custom_component, CustomErrorHandlerModel: self.create_custom_component, CustomIncrementalSyncModel: self.create_custom_component, CustomRecordExtractorModel: self.create_custom_component, @@ -425,6 +433,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -619,11 +628,16 @@ def create_legacy_to_per_partition_state_migration( "LegacyToPerPartitionStateMigrations can only be applied with a parent stream configuration." ) + if not hasattr(declarative_stream, "incremental_sync"): + raise ValueError( + "LegacyToPerPartitionStateMigrations can only be applied with an incremental_sync configuration." + ) + return LegacyToPerPartitionStateMigration( - declarative_stream.retriever.partition_router, - declarative_stream.incremental_sync, + partition_router, # type: ignore # was already checked above + declarative_stream.incremental_sync, # type: ignore # was already checked. Migration can be applied only to incremental streams. config, - declarative_stream.parameters, + declarative_stream.parameters, # type: ignore # different type is expected here Mapping[str, Any], got Dict[str, Any] ) # type: ignore # The retriever type was already checked def create_session_token_authenticator( @@ -1548,6 +1562,12 @@ def create_iterable_decoder( def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder: return XmlDecoder(parameters={}) + @staticmethod + def create_gzipjson_decoder( + model: GzipJsonDecoderModel, config: Config, **kwargs: Any + ) -> GzipJsonDecoder: + return GzipJsonDecoder(parameters={}, encoding=model.encoding) + @staticmethod def create_json_file_schema_loader( model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index 861b6e27..8d4d22f7 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import gzip import json import os @@ -13,6 +14,7 @@ from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) +from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder @pytest.mark.parametrize( @@ -120,3 +122,78 @@ def get_body(): counter += 1 assert counter == lines_in_response * len(stream_slices) + + +@pytest.mark.parametrize( + "encoding", + [ + "utf-8", + "utf", + ], + ids=["utf-8", "utf"], +) +def test_gzipjson_decoder(requests_mock, encoding): + response_to_compress = json.dumps( + [ + { + "campaignId": 214078428, + "campaignName": "sample-campaign-name-214078428", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021", + }, + { + "campaignId": 44504582, + "campaignName": "sample-campaign-name-44504582", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021", + }, + { + "campaignId": 509144838, + "campaignName": "sample-campaign-name-509144838", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021", + }, + { + "campaignId": 231712082, + "campaignName": "sample-campaign-name-231712082", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021", + }, + { + "campaignId": 895306040, + "campaignName": "sample-campaign-name-895306040", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021", + }, + ] + ) + body = gzip.compress(response_to_compress.encode(encoding)) + + requests_mock.register_uri("GET", "https://airbyte.io/", content=body) + response = requests.get("https://airbyte.io/") + assert len(list(GzipJsonDecoder(parameters={}, encoding=encoding).decode(response))) == 5