Skip to content

Commit

Permalink
logs warning if deduplication state is large
Browse files Browse the repository at this point in the history
  • Loading branch information
willi-mueller committed Sep 26, 2024
1 parent 6b7e59d commit cf80f0d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
7 changes: 6 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,12 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
# add directly computed hashes
unique_hashes.update(transformer.unique_hashes)
self._cached_state["unique_hashes"] = list(unique_hashes)

DEDUP_WARNING_THRESHOLD = 200
if len(self._cached_state["unique_hashes"]) > 200:
logger.warning(
f"There are over {DEDUP_WARNING_THRESHOLD} records to be deduplicated because"
f" they share the same primary key `{self.primary_key}`."
)
return rows


Expand Down
20 changes: 20 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -2586,3 +2586,23 @@ def updated_is_int(updated_at=dlt.sources.incremental("updated_at", initial_valu
pipeline.run(updated_is_int())
assert isinstance(pip_ex.value.__cause__, IncrementalCursorInvalidCoercion)
assert pip_ex.value.__cause__.cursor_path == "updated_at"


@pytest.mark.parametrize("item_type", ["object"])
@pytest.mark.parametrize("primary_key", ["id", None])
def test_warning_large_deduplication_state(item_type: TestDataItemFormat, primary_key, mocker):
@dlt.resource(primary_key=primary_key)
def some_data(
created_at=dlt.sources.incremental("created_at"),
):
yield data_to_item_format(
item_type,
[{"id": i, "created_at": 1} for i in range(201)],
)

logger_spy = mocker.spy(dlt.common.logger, "warning")
p = dlt.pipeline(pipeline_name=uniq_id())
p.extract(some_data(1))
logger_spy.assert_called_once_with(
f"There are over 200 records to be deduplicated because they share the same primary key `{primary_key}`."
)

0 comments on commit cf80f0d

Please sign in to comment.