diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index b2eed93b..18c9aad0 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -7,6 +7,7 @@ import requests +from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter @@ -21,6 +22,8 @@ SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization, } +STREAM_SLICE_RESPONSE_KEY = "response" + @dataclass class RecordSelector(HttpSelector): @@ -51,6 +54,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: if isinstance(self._name, str) else self._name ) + self.response_root_extractor = DpathExtractor(field_path=[], config={}, parameters={}) @property # type: ignore def name(self) -> str: @@ -86,9 +90,15 @@ def select_records( :return: List of Records selected from the response """ all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) - yield from self.filter_and_transform( - all_data, stream_state, records_schema, stream_slice, next_page_token - ) + + response_root_data = self.response_root_extractor.extract_records(response) + stream_state.update({STREAM_SLICE_RESPONSE_KEY: response_root_data}) + try: + yield from self.filter_and_transform( + all_data, stream_state, records_schema, stream_slice, next_page_token + ) + finally: + stream_state.pop(STREAM_SLICE_RESPONSE_KEY) def filter_and_transform( self,