diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index aa3cea70..1b607870 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -20,6 +20,9 @@ ClientSideIncrementalRecordFilterDecorator, ) from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor +from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( + PerPartitionWithGlobalCursor, +) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( @@ -300,6 +303,60 @@ def _group_streams( cursor=final_state_cursor, ) ) + elif ( + incremental_sync_component_definition + and incremental_sync_component_definition.get("type", "") + == DatetimeBasedCursorModel.__name__ + and self._stream_supports_concurrent_partition_processing( + declarative_stream=declarative_stream + ) + and hasattr(declarative_stream.retriever, "stream_slicer") + and isinstance( + declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor + ) + ): + stream_state = state_manager.get_stream_state( + stream_name=declarative_stream.name, namespace=declarative_stream.namespace + ) + partition_router = declarative_stream.retriever.stream_slicer._partition_router + + cursor = self._constructor.create_concurrent_cursor_from_perpartition_cursor( + state_manager=state_manager, + model_type=DatetimeBasedCursorModel, + component_definition=incremental_sync_component_definition, + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + stream_state=stream_state, + partition_router=partition_router, + ) + + partition_generator = StreamSlicerPartitionGenerator( + DeclarativePartitionFactory( + declarative_stream.name, + declarative_stream.get_json_schema(), + self._retriever_factory( + name_to_stream_mapping[declarative_stream.name], + config, + stream_state, + ), + self.message_repository, + ), + cursor, + ) + + concurrent_streams.append( + DefaultStream( + partition_generator=partition_generator, + name=declarative_stream.name, + json_schema=declarative_stream.get_json_schema(), + availability_strategy=AlwaysAvailableAvailabilityStrategy(), + primary_key=get_primary_key_from_stream(declarative_stream.primary_key), + cursor_field=cursor.cursor_field.cursor_field_key, + logger=self.logger, + cursor=cursor, + ) + ) else: synchronous_streams.append(declarative_stream) else: diff --git a/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte_cdk/sources/declarative/extractors/record_filter.py index b744c979..37366961 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -59,13 +59,11 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter): def __init__( self, - date_time_based_cursor: DatetimeBasedCursor, - substream_cursor: Optional[Union[PerPartitionWithGlobalCursor, GlobalSubstreamCursor]], + cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor], **kwargs: Any, ): super().__init__(**kwargs) - self._date_time_based_cursor = date_time_based_cursor - self._substream_cursor = substream_cursor + self._cursor = cursor def filter_records( self, @@ -77,7 +75,7 @@ def filter_records( records = ( record for record in records - if (self._substream_cursor or self._date_time_based_cursor).should_be_synced( + if self._cursor.should_be_synced( # Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here # Record stream name is empty cause it is not used durig the filtering Record(data=record, associated_slice=stream_slice, stream_name="") diff --git a/airbyte_cdk/sources/declarative/incremental/__init__.py b/airbyte_cdk/sources/declarative/incremental/__init__.py index 11c1cba9..7acd681e 100644 --- a/airbyte_cdk/sources/declarative/incremental/__init__.py +++ b/airbyte_cdk/sources/declarative/incremental/__init__.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ConcurrentCursorFactory, ConcurrentPerPartitionCursor from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import GlobalSubstreamCursor @@ -14,6 +15,8 @@ __all__ = [ "CursorFactory", + "ConcurrentCursorFactory" + "ConcurrentPerPartitionCursor", "DatetimeBasedCursor", "DeclarativeCursor", "GlobalSubstreamCursor", diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py new file mode 100644 index 00000000..83b774fe --- /dev/null +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -0,0 +1,344 @@ +import copy +import logging + +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import threading +from collections import OrderedDict +from copy import deepcopy +from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional + +from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager +from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor +from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( + Timer, + iterate_with_last_flag_and_state, +) +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter +from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import ( + PerPartitionKeySerializer, +) +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField +from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition +from airbyte_cdk.sources.types import Record, StreamSlice, StreamState + +logger = logging.getLogger("airbyte") + + +class ConcurrentCursorFactory: + def __init__(self, create_function: Callable[..., Cursor]): + self._create_function = create_function + + def create(self, stream_state: Mapping[str, Any]) -> Cursor: + return self._create_function(stream_state=stream_state) + + +class ConcurrentPerPartitionCursor(Cursor): + """ + Manages state per partition when a stream has many partitions, to prevent data loss or duplication. + + **Partition Limitation and Limit Reached Logic** + + - **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000). + - **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition. + - **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded. + + The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage. + + - When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly. + - The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors. + + This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed. + """ + + DEFAULT_MAX_PARTITIONS_NUMBER = 10000 + _NO_STATE: Mapping[str, Any] = {} + _NO_CURSOR_STATE: Mapping[str, Any] = {} + _KEY = 0 + _VALUE = 1 + + def __init__( + self, + cursor_factory: ConcurrentCursorFactory, + partition_router: PartitionRouter, + stream_name: str, + stream_namespace: Optional[str], + stream_state: Any, + message_repository: MessageRepository, + connector_state_manager: ConnectorStateManager, + cursor_field: CursorField, + ) -> None: + self._global_cursor: Mapping[str, Any] = {} + self._stream_name = stream_name + self._stream_namespace = stream_namespace + self._message_repository = message_repository + self._connector_state_manager = connector_state_manager + self._cursor_field = cursor_field + + self._cursor_factory = cursor_factory + self._partition_router = partition_router + + # The dict is ordered to ensure that once the maximum number of partitions is reached, + # the oldest partitions can be efficiently removed, maintaining the most recent partitions. + self._cursor_per_partition: OrderedDict[str, Cursor] = OrderedDict() + self._state = {"states": []} + self._semaphore_per_partition = OrderedDict() + self._finished_partitions = set() + self._lock = threading.Lock() + self._timer = Timer() + self._new_global_cursor = None + self._lookback_window = 0 + self._parent_state = None + self._over_limit = 0 + self._partition_serializer = PerPartitionKeySerializer() + + self._set_initial_state(stream_state) + + @property + def cursor_field(self) -> CursorField: + return self._cursor_field + + @property + def state(self) -> MutableMapping[str, Any]: + states = [] + for partition_tuple, cursor in self._cursor_per_partition.items(): + cursor_state = cursor._connector_state_converter.convert_to_state_message( + self.cursor_field, cursor.state + ) + if cursor_state: + states.append( + { + "partition": self._to_dict(partition_tuple), + "cursor": copy.deepcopy(cursor_state), + } + ) + state: dict[str, Any] = {"states": states} + + if self._global_cursor: + state["state"] = self._global_cursor + if self._lookback_window is not None: + state["lookback_window"] = self._lookback_window + if self._parent_state is not None: + state["parent_state"] = self._parent_state + return state + + def close_partition(self, partition: Partition) -> None: + print(f"Closing partition {self._to_partition_key(partition._stream_slice.partition)}") + self._cursor_per_partition[ + self._to_partition_key(partition._stream_slice.partition) + ].close_partition(partition=partition) + with self._lock: + self._semaphore_per_partition[ + self._to_partition_key(partition._stream_slice.partition) + ].acquire() + cursor = self._cursor_per_partition[ + self._to_partition_key(partition._stream_slice.partition) + ] + cursor_state = cursor._connector_state_converter.convert_to_state_message( + cursor._cursor_field, cursor.state + ) + print(f"State {cursor_state} {cursor.state}") + if ( + self._to_partition_key(partition._stream_slice.partition) + in self._finished_partitions + and self._semaphore_per_partition[ + self._to_partition_key(partition._stream_slice.partition) + ]._value + == 0 + ): + if ( + self._new_global_cursor is None + or self._new_global_cursor[self.cursor_field.cursor_field_key] + < cursor_state[self.cursor_field.cursor_field_key] + ): + self._new_global_cursor = copy.deepcopy(cursor_state) + + def ensure_at_least_one_state_emitted(self) -> None: + """ + The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be + called. + """ + if not any( + semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() + ): + self._global_cursor = self._new_global_cursor + self._lookback_window = self._timer.finish() + self._parent_state = self._partition_router.get_stream_state() + self._emit_state_message() + + def _emit_state_message(self) -> None: + self._connector_state_manager.update_state_for_stream( + self._stream_name, + self._stream_namespace, + self.state, + ) + state_message = self._connector_state_manager.create_state_message( + self._stream_name, self._stream_namespace + ) + self._message_repository.emit_message(state_message) + + def stream_slices(self) -> Iterable[StreamSlice]: + slices = self._partition_router.stream_slices() + self._timer.start() + for partition in slices: + yield from self.generate_slices_from_partition(partition) + + def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: + # Ensure the maximum number of partitions is not exceeded + self._ensure_partition_limit() + + cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) + if not cursor: + partition_state = ( + self._global_cursor + if self._global_cursor + else self._NO_CURSOR_STATE + ) + cursor = self._create_cursor(partition_state) + self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor + self._semaphore_per_partition[self._to_partition_key(partition.partition)] = ( + threading.Semaphore(0) + ) + + for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state( + cursor.stream_slices(), + lambda: None, + ): + self._semaphore_per_partition[self._to_partition_key(partition.partition)].release() + if is_last_slice: + self._finished_partitions.add(self._to_partition_key(partition.partition)) + yield StreamSlice( + partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields + ) + + def _ensure_partition_limit(self) -> None: + """ + Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. + """ + while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: + self._over_limit += 1 + oldest_partition = self._cursor_per_partition.popitem(last=False)[ + 0 + ] # Remove the oldest partition + logger.warning( + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + ) + + def limit_reached(self) -> bool: + return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER + + def _set_initial_state(self, stream_state: StreamState) -> None: + """ + Set the initial state for the cursors. + + This method initializes the state for each partition cursor using the provided stream state. + If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. + + Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router + does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. + + Args: + stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: + { + "states": [ + { + "partition": { + "partition_key": "value" + }, + "cursor": { + "last_updated": "2023-05-27T00:00:00Z" + } + } + ], + "parent_state": { + "parent_stream_name": { + "last_updated": "2023-05-27T00:00:00Z" + } + } + } + """ + if not stream_state: + return + + if "states" not in stream_state: + # We assume that `stream_state` is in a global format that can be applied to all partitions. + # Example: {"global_state_format_key": "global_state_format_value"} + self._global_cursor = deepcopy(stream_state) + self._new_global_cursor = deepcopy(stream_state) + + else: + for state in stream_state["states"]: + self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( + self._create_cursor(state["cursor"]) + ) + self._semaphore_per_partition[self._to_partition_key(state["partition"])] = ( + threading.Semaphore(0) + ) + + # set default state for missing partitions if it is per partition with fallback to global + if "state" in stream_state: + self._global_cursor = deepcopy(stream_state["state"]) + self._new_global_cursor = deepcopy(stream_state["state"]) + + # Set parent state for partition routers based on parent streams + self._partition_router.set_initial_state(stream_state) + + def observe(self, record: Record) -> None: + print("Observing record in concirrent perpartition ", self._to_partition_key(record.associated_slice.partition), record, self._cursor_per_partition[ + self._to_partition_key(record.associated_slice.partition) + ].state) + self._cursor_per_partition[ + self._to_partition_key(record.associated_slice.partition) + ].observe(record) + + def _to_partition_key(self, partition: Mapping[str, Any]) -> str: + return self._partition_serializer.to_partition_key(partition) + + def _to_dict(self, partition_key: str) -> Mapping[str, Any]: + return self._partition_serializer.to_partition(partition_key) + + def _create_cursor(self, cursor_state: Any) -> Cursor: + cursor = self._cursor_factory.create(stream_state=deepcopy(cursor_state)) + return cursor + + def should_be_synced(self, record: Record) -> bool: + return self._get_cursor(record).should_be_synced(record) + + def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: + if not first.associated_slice or not second.associated_slice: + raise ValueError( + f"Both records should have an associated slice but got {first.associated_slice} and {second.associated_slice}" + ) + if first.associated_slice.partition != second.associated_slice.partition: + raise ValueError( + f"To compare records, partition should be the same but got {first.associated_slice.partition} and {second.associated_slice.partition}" + ) + + return self._get_cursor(first).is_greater_than_or_equal( + self._convert_record_to_cursor_record(first), + self._convert_record_to_cursor_record(second), + ) + + @staticmethod + def _convert_record_to_cursor_record(record: Record) -> Record: + return Record( + record.data, + StreamSlice(partition={}, cursor_slice=record.associated_slice.cursor_slice) + if record.associated_slice + else None, + ) + + def _get_cursor(self, record: Record) -> Cursor: + if not record.associated_slice: + raise ValueError( + "Invalid state as stream slices that are emitted should refer to an existing cursor" + ) + partition_key = self._to_partition_key(record.associated_slice.partition) + if partition_key not in self._cursor_per_partition: + raise ValueError( + "Invalid state as stream slices that are emitted should refer to an existing cursor" + ) + cursor = self._cursor_per_partition[partition_key] + return cursor diff --git a/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index 8241f776..1529e90e 100644 --- a/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -303,6 +303,20 @@ def get_request_body_json( raise ValueError("A partition needs to be provided in order to get request body json") def should_be_synced(self, record: Record) -> bool: + if ( + self._to_partition_key(record.associated_slice.partition) + not in self._cursor_per_partition + ): + partition_state = ( + self._state_to_migrate_from + if self._state_to_migrate_from + else self._NO_CURSOR_STATE + ) + cursor = self._create_cursor(partition_state) + + self._cursor_per_partition[ + self._to_partition_key(record.associated_slice.partition) + ] = cursor return self._get_cursor(record).should_be_synced( self._convert_record_to_cursor_record(record) ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3ff6a871..2decfbd4 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -81,6 +81,8 @@ ) from airbyte_cdk.sources.declarative.incremental import ( ChildPartitionResumableFullRefreshCursor, + ConcurrentCursorFactory, + ConcurrentPerPartitionCursor, CursorFactory, DatetimeBasedCursor, DeclarativeCursor, @@ -400,6 +402,7 @@ InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, MessageRepository, + NoopMessageRepository, ) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( @@ -808,6 +811,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( stream_namespace: Optional[str], config: Config, stream_state: MutableMapping[str, Any], + message_repository: Optional[MessageRepository] = None, **kwargs: Any, ) -> ConcurrentCursor: component_type = component_definition.get("type") @@ -943,7 +947,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( stream_name=stream_name, stream_namespace=stream_namespace, stream_state=stream_state, - message_repository=self._message_repository, + message_repository=message_repository or self._message_repository, connector_state_manager=state_manager, connector_state_converter=connector_state_converter, cursor_field=cursor_field, @@ -955,6 +959,63 @@ def create_concurrent_cursor_from_datetime_based_cursor( cursor_granularity=cursor_granularity, ) + def create_concurrent_cursor_from_perpartition_cursor( + self, + state_manager: ConnectorStateManager, + model_type: Type[BaseModel], + component_definition: ComponentDefinition, + stream_name: str, + stream_namespace: Optional[str], + config: Config, + stream_state: MutableMapping[str, Any], + partition_router, + **kwargs: Any, + ) -> ConcurrentPerPartitionCursor: + component_type = component_definition.get("type") + if component_definition.get("type") != model_type.__name__: + raise ValueError( + f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" + ) + + datetime_based_cursor_model = model_type.parse_obj(component_definition) + + if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): + raise ValueError( + f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}" + ) + + interpolated_cursor_field = InterpolatedString.create( + datetime_based_cursor_model.cursor_field, + parameters=datetime_based_cursor_model.parameters or {}, + ) + cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + + # Create the cursor factory + cursor_factory = ConcurrentCursorFactory( + partial( + self.create_concurrent_cursor_from_datetime_based_cursor, + state_manager=state_manager, + model_type=model_type, + component_definition=component_definition, + stream_name=stream_name, + stream_namespace=stream_namespace, + config=config, + message_repository=NoopMessageRepository(), + ) + ) + + # Return the concurrent cursor and state converter + return ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory, + partition_router=partition_router, + stream_name=stream_name, + stream_namespace=stream_namespace, + stream_state=stream_state, + message_repository=self._message_repository, # type: ignore + connector_state_manager=state_manager, + cursor_field=cursor_field, + ) + @staticmethod def create_constant_backoff_strategy( model: ConstantBackoffStrategyModel, config: Config, **kwargs: Any @@ -1237,18 +1298,15 @@ def create_declarative_stream( raise ValueError( "Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead" ) - client_side_incremental_sync = { - "date_time_based_cursor": self._create_component_from_model( - model=model.incremental_sync, config=config - ), - "substream_cursor": ( - combined_slicers - if isinstance( - combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor) - ) - else None - ), - } + cursor = ( + combined_slicers + if isinstance( + combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor) + ) + else self._create_component_from_model(model=model.incremental_sync, config=config) + ) + + client_side_incremental_sync = {"cursor": cursor} if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel): cursor_model = model.incremental_sync @@ -2026,7 +2084,7 @@ def create_simple_retriever( if ( not isinstance(stream_slicer, DatetimeBasedCursor) or type(stream_slicer) is not DatetimeBasedCursor - ): + ) and not isinstance(stream_slicer, PerPartitionWithGlobalCursor): # Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods). # Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement # their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index cc704059..3936dda4 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -178,7 +178,7 @@ def _request_headers( stream_slice, next_page_token, self._paginator.get_request_headers, - self.stream_slicer.get_request_headers, + self.request_option_provider.get_request_headers, ) if isinstance(headers, str): raise ValueError("Request headers cannot be a string") diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index cbce82a9..ea6b0192 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -226,6 +226,7 @@ def _get_concurrent_state( ) def observe(self, record: Record) -> None: + print(f"Observing record: {record}") most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( record.associated_slice ) diff --git a/unit_tests/sources/declarative/extractors/test_record_filter.py b/unit_tests/sources/declarative/extractors/test_record_filter.py index 12f06a94..5df39132 100644 --- a/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -290,8 +290,7 @@ def test_client_side_record_filter_decorator_no_parent_stream( config={}, condition=record_filter_expression, parameters={}, - date_time_based_cursor=date_time_based_cursor, - substream_cursor=None, + cursor=date_time_based_cursor, ) filtered_records = list( @@ -429,8 +428,7 @@ def date_time_based_cursor_factory() -> DatetimeBasedCursor: record_filter_decorator = ClientSideIncrementalRecordFilterDecorator( config={}, parameters={}, - date_time_based_cursor=date_time_based_cursor, - substream_cursor=substream_cursor, + cursor=substream_cursor or date_time_based_cursor, ) # The partition we're testing diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py new file mode 100644 index 00000000..cdeb70d4 --- /dev/null +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -0,0 +1,1840 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +import copy +from copy import deepcopy +from typing import Any, List, Mapping, MutableMapping, Optional, Union +from unittest.mock import MagicMock + +import pytest +import requests_mock +from orjson import orjson + +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, + AirbyteStream, + AirbyteStreamState, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + StreamDescriptor, + SyncMode, +) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) + +SUBSTREAM_MANIFEST: MutableMapping[str, Any] = { + "version": "0.51.42", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["post_comment_votes"]}, + "definitions": { + "basic_authenticator": { + "type": "BasicHttpAuthenticator", + "username": "{{ config['credentials']['email'] + '/token' }}", + "password": "{{ config['credentials']['api_token'] }}", + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["{{ parameters.get('data_path') or parameters['name'] }}"], + }, + "schema_normalization": "Default", + }, + "paginator": { + "type": "DefaultPaginator", + "page_size_option": { + "type": "RequestOption", + "field_name": "per_page", + "inject_into": "request_parameter", + }, + "pagination_strategy": { + "type": "CursorPagination", + "page_size": 100, + "cursor_value": "{{ response.get('next_page', {}) }}", + "stop_condition": "{{ not response.get('next_page', {}) }}", + }, + "page_token_option": {"type": "RequestPath"}, + }, + }, + "cursor_incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date')}}"}, + "start_time_option": { + "inject_into": "request_parameter", + "field_name": "start_time", + "type": "RequestOption", + }, + }, + "posts_stream": { + "type": "DeclarativeStream", + "name": "posts", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "title": {"type": "string"}, + "content": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + }, + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "posts", + "path": "community/posts", + "data_path": "posts", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comments_stream": { + "type": "DeclarativeStream", + "name": "post_comments", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "post_id": {"type": "integer"}, + "comment": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts/{{ stream_slice.id }}/comments", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, + "record_filter": { + "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + }, + }, + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/posts_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + } + ], + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date') }}"}, + }, + "$parameters": { + "name": "post_comments", + "path": "community/posts/{{ stream_slice.id }}/comments", + "data_path": "comments", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comment_votes_stream": { + "type": "DeclarativeStream", + "name": "post_comment_votes", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + "comment_id": {"type": "integer"}, + "vote": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts/{{ stream_slice.parent_slice.id }}/comments/{{ stream_slice.id }}/votes", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/post_comments_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + } + ], + }, + }, + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "post_comment_votes", + "path": "community/posts/{{ stream_slice.parent_slice.id }}/comments/{{ stream_slice.id }}/votes", + "data_path": "votes", + "cursor_field": "created_at", + "primary_key": "id", + }, + }, + }, + "streams": [ + {"$ref": "#/definitions/posts_stream"}, + {"$ref": "#/definitions/post_comments_stream"}, + {"$ref": "#/definitions/post_comment_votes_stream"}, + ], + "concurrency_level": { + "type": "ConcurrencyLevel", + "default_concurrency": "{{ config['num_workers'] or 10 }}", + "max_concurrency": 25, + }, +} + +SUBSTREAM_MANIFEST_NO_DEPENDENCY = deepcopy(SUBSTREAM_MANIFEST) +# Disable incremental_dependency +SUBSTREAM_MANIFEST_NO_DEPENDENCY["definitions"]["post_comments_stream"]["retriever"]["partition_router"][ + "parent_stream_configs" +][0]["incremental_dependency"] = False +SUBSTREAM_MANIFEST_NO_DEPENDENCY["definitions"]["post_comment_votes_stream"]["retriever"]["partition_router"][ + "parent_stream_configs" +][0]["incremental_dependency"] = False + + + +def _run_read( + manifest: Mapping[str, Any], + config: Mapping[str, Any], + stream_name: str, + state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None, +) -> List[AirbyteMessage]: + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name=stream_name, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + ] + ) + logger = MagicMock() + source = ConcurrentDeclarativeSource( + source_config=manifest, config=config, catalog=catalog, state=state + ) + messages = list(source.read(logger=source.logger, config=config, catalog=catalog, state=[])) + return messages + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST_NO_DEPENDENCY, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "posts": [ + {"id": 1, "updated_at": "2024-01-30T00:00:00Z"}, + {"id": 2, "updated_at": "2024-01-29T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-01T00:00:01Z&page=2", + {"posts": [{"id": 3, "updated_at": "2024-01-28T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + { + "votes": [ + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [ + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"} + ] + }, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + { + "votes": [ + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"} + ] + }, + ), + ], + # Expected records + [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"}, + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"}, + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}, + {"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"}, + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"}, + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"}, + ], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor( + name="post_comment_votes", namespace=None + ), + stream_state=AirbyteStateBlob( + { + # This should not happen since parent state is disabled, but I've added this to validate that and + # incoming parent_state is ignored when the parent stream's incremental_dependency is disabled + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2023-01-04T00:00:00Z"}, + } + ], + "parent_state": { + "posts": {"updated_at": "2024-01-05T00:00:00Z"} + }, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + } + ), + ), + ) + ], + # Expected state + { + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-15T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-13T00:00:00Z"}, + }, + { + "cursor": {"created_at": "2024-01-01T00:00:01Z"}, + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:00Z"}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:15Z"}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-10T00:00:00Z"}, + }, + ], + "lookback_window": 1, + "parent_state": {}, + "state": {"created_at": "2024-01-15T00:00:00Z"}, + }, + ), + ], +) +def test_incremental_parent_state_no_incremental_dependency( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + This is a pretty complicated test that syncs a low-code connector stream with three levels of substreams + - posts: (ids: 1, 2, 3) + - post comments: (parent post 1 with ids: 9, 10, 11, 12; parent post 2 with ids: 20, 21; parent post 3 with id: 30) + - post comment votes: (parent comment 10 with ids: 100, 101; parent comment 11 with id: 102; + parent comment 20 with id: 200; parent comment 21 with id: 201, parent comment 30 with id: 300) + + By setting incremental_dependency to false, parent streams will not use the incoming state and will not update state. + The post_comment_votes substream is incremental and will emit state messages We verify this by ensuring that mocked + parent stream requests use the incoming config as query parameters and the substream state messages does not + contain parent stream state. + """ + + _stream_name = "post_comment_votes" + config = { + "start_date": "2024-01-01T00:00:01Z", + "credentials": {"email": "email", "api_token": "api_token"}, + } + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + assert set(tuple(sorted(d.items())) for d in output_data) == set( + tuple(sorted(d.items())) for d in expected_records + ) + final_state = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) + for message in output + if message.state + ] + assert final_state[-1] == expected_state + + +def run_incremental_parent_state_test( + manifest, mock_requests, expected_records, initial_state, expected_states +): + """ + Run an incremental parent state test for the specified stream. + + This function performs the following steps: + 1. Mocks the API requests as defined in mock_requests. + 2. Executes the read operation using the provided manifest and config. + 3. Asserts that the output records match the expected records. + 4. Collects intermediate states and records, performing additional reads as necessary. + 5. Compares the cumulative records from each state against the expected records. + 6. Asserts that the final state matches one of the expected states for each run. + + Args: + manifest (dict): The manifest configuration for the stream. + mock_requests (list): A list of tuples containing URL and response data for mocking API requests. + expected_records (list): The expected records to compare against the output. + initial_state (list): The initial state to start the read operation. + expected_states (list): A list of expected final states after the read operation. + """ + _stream_name = "post_comment_votes" + config = { + "start_date": "2024-01-01T00:00:01Z", + "credentials": {"email": "email", "api_token": "api_token"}, + } + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + # Run the initial read + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + # Assert that output_data equals expected_records + assert sorted(output_data, key=lambda x: orjson.dumps(x)) == sorted( + expected_records, key=lambda x: orjson.dumps(x) + ) + + # Collect the intermediate states and records produced before each state + cumulative_records = [] + intermediate_states = [] + final_states = [] # To store the final state after each read + + # Store the final state after the initial read + final_state_initial = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) + for message in output + if message.state + ] + final_states.append(final_state_initial[-1]) + + for message in output: + if message.type.value == "RECORD": + record_data = message.record.data + cumulative_records.append(record_data) + elif message.type.value == "STATE": + # Record the state and the records produced before this state + state = message.state + records_before_state = cumulative_records.copy() + intermediate_states.append((state, records_before_state)) + + # For each intermediate state, perform another read starting from that state + for state, records_before_state in intermediate_states[:-1]: + output_intermediate = _run_read(manifest, config, _stream_name, [state]) + records_from_state = [ + message.record.data for message in output_intermediate if message.record + ] + + # Combine records produced before the state with records from the new read + cumulative_records_state = records_before_state + records_from_state + + # Duplicates may occur because the state matches the cursor of the last record, causing it to be re-emitted in the next sync. + cumulative_records_state_deduped = list( + {orjson.dumps(record): record for record in cumulative_records_state}.values() + ) + + # Compare the cumulative records with the expected records + expected_records_set = list( + {orjson.dumps(record): record for record in expected_records}.values() + ) + assert ( + sorted(cumulative_records_state_deduped, key=lambda x: orjson.dumps(x)) + == sorted(expected_records_set, key=lambda x: orjson.dumps(x)) + ), f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" + + # Store the final state after each intermediate read + final_state_intermediate = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) + for message in output_intermediate + if message.state + ] + final_states.append(final_state_intermediate[-1]) + + # Assert that the final state matches the expected state for all runs + for i, final_state in enumerate(final_states): + assert ( + final_state in expected_states + ), f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z", + { + "posts": [ + {"id": 1, "updated_at": "2024-01-30T00:00:00Z"}, + {"id": 2, "updated_at": "2024-01-29T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + {"posts": [{"id": 3, "updated_at": "2024-01-28T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + { + "votes": [ + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [ + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"} + ] + }, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + { + "votes": [ + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"} + ] + }, + ), + # Requests with intermediate states + # Fetch votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-15T00:00:00Z", + { + "votes": [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"} + ], + }, + ), + # Fetch votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-13T00:00:00Z", + { + "votes": [ + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"} + ], + }, + ), + # Fetch votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-15T00:00:00Z", + { + "votes": [], + }, + ), + # Fetch votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-12T00:00:00Z", + { + "votes": [ + {"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"} + ] + }, + ), + # Fetch votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-12T00:00:15Z", + { + "votes": [ + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"} + ] + }, + ), + ], + # Expected records + [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"}, + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"}, + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}, + {"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"}, + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"}, + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"}, + ], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor( + name="post_comment_votes", namespace=None + ), + stream_state=AirbyteStateBlob( + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2023-01-04T00:00:00Z"}, + } + ], + "parent_state": { + "posts": {"updated_at": "2024-01-05T00:00:00Z"} + }, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + } + ), + ), + ) + ], + # Expected state + { + "state": {"created_at": "2024-01-15T00:00:00Z"}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-25T00:00:00Z"}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-22T00:00:00Z"}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-09T00:00:00Z"}, + }, + ], + } + }, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-15T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-13T00:00:00Z"}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-01T00:00:01Z"}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:00Z"}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:15Z"}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-10T00:00:00Z"}, + }, + ], + }, + ), + ], +) +def test_incremental_parent_state( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + additional_expected_state = copy.deepcopy(expected_state) + # State for empty partition (comment 12), when the global cursor is used for intermediate states + empty_state = { + "cursor": {"created_at": "2024-01-01T00:00:01Z"}, + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + } + additional_expected_state["states"].append(empty_state) + print(manifest) + run_incremental_parent_state_test( + manifest, + mock_requests, + expected_records, + initial_state, + [expected_state, additional_expected_state], + ) + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "posts": [ + {"id": 1, "updated_at": "2024-01-30T00:00:00Z"}, + {"id": 2, "updated_at": "2024-01-29T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-02T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-02T00:00:00Z&page=2", + {"posts": [{"id": 3, "updated_at": "2024-01-28T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-02T00:00:00Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [ + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"} + ] + }, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"} + ] + }, + ), + ], + # Expected records + [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"}, + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"}, + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}, + {"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"}, + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"}, + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"}, + ], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor( + name="post_comment_votes", namespace=None + ), + stream_state=AirbyteStateBlob({"created_at": "2024-01-02T00:00:00Z"}), + ), + ) + ], + # Expected state + { + "state": {"created_at": "2024-01-15T00:00:00Z"}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-25T00:00:00Z"}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-22T00:00:00Z"}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-09T00:00:00Z"}, + }, + ], + } + }, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-15T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-13T00:00:00Z"}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:00Z"}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:15Z"}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-10T00:00:00Z"}, + }, + ], + }, + ), + ], +) +def test_incremental_parent_state_migration( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test incremental partition router with parent state migration + """ + _stream_name = "post_comment_votes" + config = { + "start_date": "2024-01-01T00:00:01Z", + "credentials": {"email": "email", "api_token": "api_token"}, + } + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + assert set(tuple(sorted(d.items())) for d in output_data) == set( + tuple(sorted(d.items())) for d in expected_records + ) + final_state = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) + for message in output + if message.state + ] + assert final_state[-1] == expected_state + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z", + { + "posts": [], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + {"posts": []}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": []}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": []}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": []}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + {"votes": []}, + ), + ], + # Expected records + [], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor( + name="post_comment_votes", namespace=None + ), + stream_state=AirbyteStateBlob( + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2023-01-04T00:00:00Z"}, + } + ], + "parent_state": { + "posts": {"updated_at": "2024-01-05T00:00:00Z"} + }, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + "state": {"created_at": "2024-01-03T00:00:00Z"}, + "lookback_window": 1, + } + ), + ), + ) + ], + # Expected state + { + "lookback_window": 1, + "state": {"created_at": "2024-01-03T00:00:00Z"}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {}, + "parent_state": {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2023-01-04T00:00:00Z"}, + } + ], + } + }, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + }, + ), + ], +) +def test_incremental_parent_state_no_slices( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test incremental partition router with no parent records + """ + _stream_name = "post_comment_votes" + config = { + "start_date": "2024-01-01T00:00:01Z", + "credentials": {"email": "email", "api_token": "api_token"}, + } + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + assert output_data == expected_records + final_state = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) + for message in output + if message.state + ] + assert final_state[-1] == expected_state + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z", + { + "posts": [ + {"id": 1, "updated_at": "2024-01-30T00:00:00Z"}, + {"id": 2, "updated_at": "2024-01-29T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + {"posts": [{"id": 3, "updated_at": "2024-01-28T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [ + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + {"votes": []}, + ), + ], + # Expected records + [], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor( + name="post_comment_votes", namespace=None + ), + stream_state=AirbyteStateBlob( + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2023-01-04T00:00:00Z"}, + } + ], + "parent_state": { + "posts": {"updated_at": "2024-01-05T00:00:00Z"} + }, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + "use_global_cursor": True, + "state": {"created_at": "2024-01-03T00:00:00Z"}, + "lookback_window": 0, + } + ), + ), + ) + ], + # Expected state + { + "lookback_window": 1, + "state": {"created_at": "2024-01-03T00:00:00Z"}, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + {'cursor': {'created_at': '2024-01-03T00:00:00Z'}, + 'partition': {'id': 12, + 'parent_slice': {'id': 1, 'parent_slice': {}}}}, + {'cursor': {'created_at': '2024-01-03T00:00:00Z'}, + 'partition': {'id': 20, + 'parent_slice': {'id': 2, 'parent_slice': {}}}}, + {'cursor': {'created_at': '2024-01-03T00:00:00Z'}, + 'partition': {'id': 21, + 'parent_slice': {'id': 2, 'parent_slice': {}}}}, + {'cursor': {'created_at': '2024-01-03T00:00:00Z'}, + 'partition': {'id': 30, + 'parent_slice': {'id': 3, 'parent_slice': {}}}} + ], + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-25T00:00:00Z"}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-22T00:00:00Z"}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-09T00:00:00Z"}, + }, + ], + } + }, + }, + ), + ], +) +def test_incremental_parent_state_no_records( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test incremental partition router with no child records + """ + _stream_name = "post_comment_votes" + config = { + "start_date": "2024-01-01T00:00:01Z", + "credentials": {"email": "email", "api_token": "api_token"}, + } + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + assert output_data == expected_records + final_state = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) + for message in output + if message.state + ] + assert final_state[-1] == expected_state + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z", + { + "posts": [ + {"id": 1, "updated_at": "2024-01-30T00:00:00Z"}, + {"id": 2, "updated_at": "2024-01-29T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + {"posts": [{"id": 3, "updated_at": "2024-01-28T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + { + "votes": [ + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"} + ] + }, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [ + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "status_code": 500, + "json": {"error": "Internal Server Error"}, + }, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "votes": [ + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"} + ] + }, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + { + "votes": [ + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"} + ] + }, + ), + ], + # Expected records + [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"}, + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"}, + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}, + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"}, + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"}, + ], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor( + name="post_comment_votes", namespace=None + ), + stream_state=AirbyteStateBlob( + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2023-01-04T00:00:00Z"}, + } + ], + "parent_state": { + "posts": {"updated_at": "2024-01-05T00:00:00Z"} + }, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + } + ), + ), + ) + ], + # Expected state + { + "state": {"created_at": "2024-01-15T00:00:00Z"}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-25T00:00:00Z"}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-22T00:00:00Z"}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": "2024-01-09T00:00:00Z"}, + }, + ], + } + }, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-15T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-13T00:00:00Z"}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-01T00:00:01Z"}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-01T00:00:01Z"}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:15Z"}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-10T00:00:00Z"}, + }, + ], + }, + ), + ], +) +def test_incremental_error__parent_state( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + _stream_name = "post_comment_votes" + config = { + "start_date": "2024-01-01T00:00:01Z", + "credentials": {"email": "email", "api_token": "api_token"}, + } + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + assert set(tuple(sorted(d.items())) for d in output_data) == set( + tuple(sorted(d.items())) for d in expected_records + ) + final_state = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) + for message in output + if message.state + ] + assert final_state[-1] == expected_state