diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 9d73269a20..69af0d68a6 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -113,6 +113,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa allow_external_schedulers: bool = False on_cursor_value_missing: OnCursorValueMissing = "raise" lag: Optional[float] = None + duplicate_cursor_warning_threshold: ClassVar[int] = 200 # incremental acting as empty EMPTY: ClassVar["Incremental[Any]"] = None @@ -529,12 +530,28 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: transformer.compute_unique_value(row, self.primary_key) for row in transformer.last_rows ) + initial_hash_count = len(self._cached_state.get("unique_hashes", [])) # add directly computed hashes unique_hashes.update(transformer.unique_hashes) self._cached_state["unique_hashes"] = list(unique_hashes) + final_hash_count = len(self._cached_state["unique_hashes"]) + self._check_duplicate_cursor_threshold(initial_hash_count, final_hash_count) return rows + def _check_duplicate_cursor_threshold( + self, initial_hash_count: int, final_hash_count: int + ) -> None: + if initial_hash_count <= Incremental.duplicate_cursor_warning_threshold < final_hash_count: + logger.warning( + f"Large number of records ({final_hash_count}) sharing the same value of " + f"cursor field '{self.cursor_path}'. This can happen if the cursor " + "field has a low resolution (e.g., only stores dates without times), " + "causing many records to share the same cursor value. " + "Consider using a cursor column with higher resolution to reduce " + "the deduplication state size." + ) + Incremental.EMPTY = Incremental[Any]() Incremental.EMPTY.__is_resolved__ = True diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 683cea51bc..7ce4228b6c 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -3539,3 +3539,32 @@ def test_apply_lag() -> None: assert apply_lag(2, 0, 1, max) == 0 assert apply_lag(1, 2, 1, min) == 2 assert apply_lag(2, 2, 1, min) == 2 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +@pytest.mark.parametrize("primary_key", ["id", None]) +def test_warning_large_deduplication_state(item_type: TestDataItemFormat, primary_key, mocker): + @dlt.resource(primary_key=primary_key) + def some_data( + created_at=dlt.sources.incremental("created_at"), + ): + # Cross the default threshold of 200 + yield data_to_item_format( + item_type, + [{"id": i, "created_at": 1} for i in range(201)], + ) + # Second batch adds more items but shouldn't trigger warning + yield data_to_item_format( + item_type, + [{"id": i, "created_at": 1} for i in range(201, 301)], + ) + + logger_spy = mocker.spy(dlt.common.logger, "warning") + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data(1)) + + # Verify warning was called exactly once + warning_calls = [ + call for call in logger_spy.call_args_list if "Large number of records" in call.args[0] + ] + assert len(warning_calls) == 1