Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert add_limit to pipe step based limiting #2131

Open
wants to merge 6 commits into
base: devel
Choose a base branch
from

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Dec 10, 2024

Description

Up until now we were managing limits inside a somewhat conflated function that was wrapping generators. The problem was that limits where applied before incrementals where and that we had a certain amount of code duplication with respect to wrapping async iterators. This PRs solves this.

@sh-rp sh-rp marked this pull request as ready for review December 10, 2024 15:47
Copy link

netlify bot commented Dec 10, 2024

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit 3738c29
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/675a93e9c0e3ef0008145eed
😎 Deploy Preview https://deploy-preview-2131--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@@ -183,6 +183,17 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in
return meta_arg


def wrap_iterator(gen: Iterator[TDataItems]) -> Iterator[TDataItems]:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pure iterators cannot be stopped, so we need to wrap them in a generator that I can control from the limit step.

@sh-rp sh-rp linked an issue Dec 10, 2024 that may be closed by this pull request
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks so simple now and I think implementation is right. two things:

  • we have a test failing in a curious way. apparently we call rest API twice even if the limit is 1. why? we count items at the end of the pipe but there's just a single pipe. we must investigate
  • by counting at end of the pipe we change the behavior. I think it makes sense... but maybe we can add sticky flag as an argument to add_limit? so people can still stick it to gen object and count unfiltered items as before?



class LimitItem(ItemTransform[TDataItem]):
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could have a _singleton flag so only one instance of such type is kept in pipeline. Another singleton is incremental (we have special code to keep just one). flag could be recognized and handled by add_step

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not done this for now, but changed a few convenience methods. The incremental confused me a bit ;)

return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
if self.count == self.max_items:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that is enough. we should close gen when we reach max items. but in this implementation we close gen at the end of the pipe. not all steps are sync steps, there are for example steps that yield. (or maybe even async steps, I do not remember). we should still return None when count > max_items.

I think we need to add more tests.

  • what happens if we do add_yield_map?
  • are we testing limit for async generators / iterators
  • any differences for round robin / fifo?
  • make sure that all expected items are pushed to transformer (this happens via special ForkStep)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is all tested now, except for round robin and fifo, but I am quite sure that this will not make a difference, since rr and fifo only apply at the get_source_item level and there is no async stuff going on in the add_limit (it's all taken care of already in other places)

add some convenience methods for pipe step management
if validator:
self.add_step(validator, insert_at=step_no if step_no >= 0 else None)
self.add_step(validator)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: i remove inserting at the same position in favor of automatic resolution via placement affinity. I think this makes more sense, I can revert to the old behavior though.

class LimitItem(ItemTransform[TDataItem]):
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental

def __init__(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the time limit and rate limiting over from the other PR. I think the time limit is fairly uncontroversial, the rate limiting is a little bit sketchy. I think it would be really cool to implement this with this PR, but we could also add a kind of global rate limiting on the Pipeiterator level that gets handed over from the DltSource to the Pipeiterator and is applied in the _get_source_item function to only extract a new item from any pipe if a min amount of time has passed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: only keeping the time limit here which is very straightforward to implement and I think pretty useful.

@joscha
Copy link
Contributor

joscha commented Dec 12, 2024

related to #2142

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Filesystem Source incremental loading with S3 not working correctly
3 participants