Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logs warning if deduplication state is large #1877

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
allow_external_schedulers: bool = False
on_cursor_value_missing: OnCursorValueMissing = "raise"
lag: Optional[float] = None
duplicate_cursor_warning_threshold: ClassVar[int] = 200

# incremental acting as empty
EMPTY: ClassVar["Incremental[Any]"] = None
Expand Down Expand Up @@ -529,12 +530,28 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
transformer.compute_unique_value(row, self.primary_key)
for row in transformer.last_rows
)
initial_hash_count = len(self._cached_state.get("unique_hashes", []))
# add directly computed hashes
unique_hashes.update(transformer.unique_hashes)
self._cached_state["unique_hashes"] = list(unique_hashes)
final_hash_count = len(self._cached_state["unique_hashes"])

self._check_duplicate_cursor_threshold(initial_hash_count, final_hash_count)
return rows

def _check_duplicate_cursor_threshold(
self, initial_hash_count: int, final_hash_count: int
) -> None:
if initial_hash_count <= Incremental.duplicate_cursor_warning_threshold < final_hash_count:
logger.warning(
f"Large number of records ({final_hash_count}) sharing the same value of "
f"cursor field '{self.cursor_path}'. This can happen if the cursor "
"field has a low resolution (e.g., only stores dates without times), "
"causing many records to share the same cursor value. "
"Consider using a cursor column with higher resolution to reduce "
"the deduplication state size."
)


Incremental.EMPTY = Incremental[Any]()
Incremental.EMPTY.__is_resolved__ = True
Expand Down
29 changes: 29 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -3539,3 +3539,32 @@ def test_apply_lag() -> None:
assert apply_lag(2, 0, 1, max) == 0
assert apply_lag(1, 2, 1, min) == 2
assert apply_lag(2, 2, 1, min) == 2


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
@pytest.mark.parametrize("primary_key", ["id", None])
def test_warning_large_deduplication_state(item_type: TestDataItemFormat, primary_key, mocker):
@dlt.resource(primary_key=primary_key)
def some_data(
created_at=dlt.sources.incremental("created_at"),
):
# Cross the default threshold of 200
yield data_to_item_format(
item_type,
[{"id": i, "created_at": 1} for i in range(201)],
)
# Second batch adds more items but shouldn't trigger warning
yield data_to_item_format(
item_type,
[{"id": i, "created_at": 1} for i in range(201, 301)],
)

logger_spy = mocker.spy(dlt.common.logger, "warning")
p = dlt.pipeline(pipeline_name=uniq_id())
p.extract(some_data(1))

# Verify warning was called exactly once
warning_calls = [
call for call in logger_spy.call_args_list if "Large number of records" in call.args[0]
]
assert len(warning_calls) == 1
Loading