Skip to content

Commit

Permalink
Make threshold configurable, display the duplication warning only onc…
Browse files Browse the repository at this point in the history
…e, update the warning message, change the test to check for single warning
  • Loading branch information
burnash committed Nov 10, 2024
1 parent c4c8fa9 commit dd66d42
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
33 changes: 26 additions & 7 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
EMPTY: ClassVar["Incremental[Any]"] = None
placement_affinity: ClassVar[float] = 1 # stick to end

DEFAULT_DUPLICATE_CURSOR_WARNING_THRESHOLD: ClassVar[int] = 200

def __init__(
self,
cursor_path: str = None,
Expand All @@ -129,6 +131,7 @@ def __init__(
allow_external_schedulers: bool = False,
on_cursor_value_missing: OnCursorValueMissing = "raise",
lag: Optional[float] = None,
duplicate_cursor_warning_threshold: Optional[int] = None,
) -> None:
# make sure that path is valid
if cursor_path:
Expand Down Expand Up @@ -165,6 +168,12 @@ def __init__(
self._bound_pipe: SupportsPipe = None
"""Bound pipe"""

self.duplicate_cursor_warning_threshold = (
duplicate_cursor_warning_threshold
if duplicate_cursor_warning_threshold is not None
else self.DEFAULT_DUPLICATE_CURSOR_WARNING_THRESHOLD
)

@property
def primary_key(self) -> Optional[TTableHintTemplate[TColumnNames]]:
return self._primary_key
Expand Down Expand Up @@ -529,18 +538,28 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
transformer.compute_unique_value(row, self.primary_key)
for row in transformer.last_rows
)
initial_hash_count = len(self._cached_state.get("unique_hashes", []))
# add directly computed hashes
unique_hashes.update(transformer.unique_hashes)
self._cached_state["unique_hashes"] = list(unique_hashes)
dedup_count = len(self._cached_state["unique_hashes"])
DEDUP_WARNING_THRESHOLD = 200
if dedup_count > DEDUP_WARNING_THRESHOLD:
logger.warning(
f"There are {dedup_count} records to be deduplicated because"
f" they share the same primary key `{self.primary_key}`."
)
final_hash_count = len(self._cached_state["unique_hashes"])

self._check_duplicate_cursor_threshold(initial_hash_count, final_hash_count)
return rows

def _check_duplicate_cursor_threshold(
self, initial_hash_count: int, final_hash_count: int
) -> None:
if initial_hash_count <= self.duplicate_cursor_warning_threshold < final_hash_count:
logger.warning(
f"Large number of records ({final_hash_count}) sharing the same value of "
f"cursor field '{self.cursor_path}'. This can happen if the cursor "
"field has a low resolution (e.g., only stores dates without times), "
"causing many records to share the same cursor value. "
"Consider using a cursor column with higher resolution to reduce "
"the deduplication state size."
)


Incremental.EMPTY = Incremental[Any]()
Incremental.EMPTY.__is_resolved__ = True
Expand Down
16 changes: 12 additions & 4 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -3548,15 +3548,23 @@ def test_warning_large_deduplication_state(item_type: TestDataItemFormat, primar
def some_data(
created_at=dlt.sources.incremental("created_at"),
):
# Cross the default threshold of 200
yield data_to_item_format(
item_type,
[{"id": i, "created_at": 1} for i in range(201)],
)
# Second batch adds more items but shouldn't trigger warning
yield data_to_item_format(
item_type,
[{"id": i, "created_at": 1} for i in range(201, 301)],
)

logger_spy = mocker.spy(dlt.common.logger, "warning")
p = dlt.pipeline(pipeline_name=uniq_id())
p.extract(some_data(1))
logger_spy.assert_any_call(
"There are 201 records to be deduplicated because they share the same primary key"
f" `{primary_key}`."
)

# Verify warning was called exactly once
warning_calls = [
call for call in logger_spy.call_args_list if "Large number of records" in call.args[0]
]
assert len(warning_calls) == 1

0 comments on commit dd66d42

Please sign in to comment.