From 8bfa7abfcfd19c56bfa8886206251d5201b677d6 Mon Sep 17 00:00:00 2001 From: Rusi Popov Date: Thu, 26 Dec 2024 02:07:05 +0200 Subject: [PATCH 1/2] #49971 Publish the response object in the transformation's context --- .../declarative/extractors/record_selector.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index b2eed93b..9c50609c 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -7,6 +7,7 @@ import requests +from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter @@ -21,6 +22,7 @@ SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization, } +STREAM_SLICE_RESPONSE_KEY = "response" @dataclass class RecordSelector(HttpSelector): @@ -51,7 +53,8 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: if isinstance(self._name, str) else self._name ) - + self.response_root_extractor = DpathExtractor(field_path=[], config={}, parameters={}) + @property # type: ignore def name(self) -> str: """ @@ -86,9 +89,16 @@ def select_records( :return: List of Records selected from the response """ all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) - yield from self.filter_and_transform( - all_data, stream_state, records_schema, stream_slice, next_page_token - ) + + response_root_data = self.response_root_extractor.extract_records(response) + stream_state.update({STREAM_SLICE_RESPONSE_KEY: response_root_data}) + try: + yield from self.filter_and_transform( + all_data, stream_state, records_schema, stream_slice, next_page_token + ) + finally: + stream_state.pop(STREAM_SLICE_RESPONSE_KEY) + def filter_and_transform( self, From e239779c7274c0cd9f73918b607f923edc6fa5dc Mon Sep 17 00:00:00 2001 From: Rusi Popov Date: Thu, 26 Dec 2024 05:01:17 +0200 Subject: [PATCH 2/2] 50395 Formatted the source code --- .../sources/declarative/extractors/record_selector.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 9c50609c..18c9aad0 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -24,6 +24,7 @@ STREAM_SLICE_RESPONSE_KEY = "response" + @dataclass class RecordSelector(HttpSelector): """ @@ -54,7 +55,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: else self._name ) self.response_root_extractor = DpathExtractor(field_path=[], config={}, parameters={}) - + @property # type: ignore def name(self) -> str: """ @@ -89,17 +90,16 @@ def select_records( :return: List of Records selected from the response """ all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) - + response_root_data = self.response_root_extractor.extract_records(response) stream_state.update({STREAM_SLICE_RESPONSE_KEY: response_root_data}) - try: + try: yield from self.filter_and_transform( all_data, stream_state, records_schema, stream_slice, next_page_token ) finally: stream_state.pop(STREAM_SLICE_RESPONSE_KEY) - def filter_and_transform( self, all_data: Iterable[Mapping[str, Any]],