-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
convert add_limit to pipe step based limiting #2131
base: devel
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for dlt-hub-docs ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
@@ -183,6 +183,17 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in | |||
return meta_arg | |||
|
|||
|
|||
def wrap_iterator(gen: Iterator[TDataItems]) -> Iterator[TDataItems]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pure iterators cannot be stopped, so we need to wrap them in a generator that I can control from the limit step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks so simple now and I think implementation is right. two things:
- we have a test failing in a curious way. apparently we call rest API twice even if the limit is 1. why? we count items at the end of the pipe but there's just a single pipe. we must investigate
- by counting at end of the pipe we change the behavior. I think it makes sense... but maybe we can add sticky flag as an argument to add_limit? so people can still stick it to gen object and count unfiltered items as before?
|
||
|
||
class LimitItem(ItemTransform[TDataItem]): | ||
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we could have a _singleton
flag so only one instance of such type is kept in pipeline. Another singleton is incremental (we have special code to keep just one). flag could be recognized and handled by add_step
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not done this for now, but changed a few convenience methods. The incremental confused me a bit ;)
dlt/extract/items.py
Outdated
return self | ||
|
||
def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: | ||
if self.count == self.max_items: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that is enough. we should close gen when we reach max items. but in this implementation we close gen at the end of the pipe. not all steps are sync steps, there are for example steps that yield. (or maybe even async steps, I do not remember). we should still return None when count > max_items.
I think we need to add more tests.
- what happens if we do add_yield_map?
- are we testing limit for async generators / iterators
- any differences for round robin / fifo?
- make sure that all expected items are pushed to transformer (this happens via special ForkStep)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is all tested now, except for round robin and fifo, but I am quite sure that this will not make a difference, since rr and fifo only apply at the get_source_item level and there is no async stuff going on in the add_limit (it's all taken care of already in other places)
add some convenience methods for pipe step management
if validator: | ||
self.add_step(validator, insert_at=step_no if step_no >= 0 else None) | ||
self.add_step(validator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: i remove inserting at the same position in favor of automatic resolution via placement affinity. I think this makes more sense, I can revert to the old behavior though.
dlt/extract/items.py
Outdated
class LimitItem(ItemTransform[TDataItem]): | ||
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental | ||
|
||
def __init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved the time limit and rate limiting over from the other PR. I think the time limit is fairly uncontroversial, the rate limiting is a little bit sketchy. I think it would be really cool to implement this with this PR, but we could also add a kind of global rate limiting on the Pipeiterator level that gets handed over from the DltSource to the Pipeiterator and is applied in the _get_source_item function to only extract a new item from any pipe if a min amount of time has passed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update: only keeping the time limit here which is very straightforward to implement and I think pretty useful.
related to #2142 |
Description
Up until now we were managing limits inside a somewhat conflated function that was wrapping generators. The problem was that limits where applied before incrementals where and that we had a certain amount of code duplication with respect to wrapping async iterators. This PRs solves this.