Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream implementation is inconvenient #1025

Open
Pyprohly opened this issue Feb 13, 2019 · 18 comments
Open

Stream implementation is inconvenient #1025

Pyprohly opened this issue Feb 13, 2019 · 18 comments
Labels
Discussion Open for discussion Feature New feature or request

Comments

@Pyprohly
Copy link
Contributor

Many reddit bots operate as a service that needs to stream in new content in real time. Often, we want the bot to be unkillable: if it disconnects from the internet it should try to reconnect continuously; if it finds that it cannot handle a particular item, it’s probably in its best interest to just ignore it. We want the bot to stay alive as much as possible.

To facilitate streaming, PRAW provides a high level streaming function which lets a bot creator focus more on bot behaviour and less on filtering older or already seen items. Unfortunately, due to a lack of exception handling, the stream generator frequently breaks: when an exception is raised in a generator the generator breaks by issuing a StopIteration on all further attempts to yield items from it.

In the case of stream_generator if the stream dies it means the bot can no longer continue running its service if it doesn’t recreate the stream.

Current approaches to streaming

Ideally, a stream object should only need to be created once…

submission_stream = subreddit.stream.submissions(pause_after=None, skip_existing=True)

while True:
    try:
        for submission in submission_stream:
            print(submission)

    except (praw.exceptions.PRAWException, prawcore.exceptions.PrawcoreException) as e:
        print('praw/stream related exception')

    except Exception:
        print('bot related exception')

However, if we go about this approach under the current stream implementation then we’d eventually find this to be an unstable setup, because an exception in the stream would bring things to a stop.

It’s currently more viable to set things up this way:

while True:
    try:
        for submission in subreddit.stream.submissions(pause_after=None, skip_existing=True):
            print(submission)

    except (praw.exceptions.PRAWException, prawcore.exceptions.PrawcoreException) as e:
        print('praw/stream related exception')

    except Exception:
        print('bot related exception')

The bot doesn’t break so easily now because if the stream breaks it’ll just be recreated. The bot is stable and code is manageable so far.

But what if we want to do a double stream? Would a similar approach work?

while True:
    try:
        for submission in subreddit.stream.submissions(pause_after=-1, skip_existing=True):
            if submission is None:
                break
            print(submission)

        for comment in subreddit.stream.comments(pause_after=-1, skip_existing=True):
            if comment is None:
                break
            print(comment)

    except (praw.exceptions.PRAWException, prawcore.exceptions.PrawcoreException) as e:
        print('praw/stream related exception')

    except Exception:
        print('bot related exception')

Turns out the same strategy won’t work here; both streams would yield None the whole time. If we try to fix this by removing the skip_existing=True then suddenly we’d be dealing with old and duplicate items, which is something that the stream is supposed to be handling for us. We could go back to defining the streams outside the loop, but then we’d face the same problem we had before, where an exception could easily break things.

There are two real solutions here:

  1. Recreate the streams when an exception happens (and have less manageable code, while being inconsistent with how a single stream is set up).
  2. Put each stream in its own script (and require that multiple scripts be started to achieve the full bot behaviour, rather than it being an option). This is currently the recommended approach as per the docs.
  3. Don’t use streams at all. Filter manually.

Clearly, these are all terrible workarounds. If we want something better, the stream’s implementation has to change.

Designing a better streaming solution

The main problem with our current stream generator is its lack of exception handling.

Since there’s no way to intercept an exception thrown in a generator in the consumer code without the generator breaking, exception handling needs to be written within the generator. At the same time we don’t want the exception handling logic to be predetermined. It’s important that we give the user a way to listen to exceptions that come from the stream. Given stream_generator’s current implementation, this may require more than minor changes to support.

If we’re going to change streaming now, there are other inconveniences about the current streaming system that we may as well address…

Since the stream generator is intended to aid in bot making, we’d want to ensure that a new streaming object, if made, would have characteristics that maximises its usefulness in bot making. Namely,

  1. A streaming object shouldn’t need to be recreated.
  2. The stream should be as resilient as possible and should not stop yielding items even after an exception occurs.
  3. Errors related to the stream should preferably be dumped into a separate place from bot related errors.
  4. The stream should avoid yielding items older than when the stream started (skip_existing=True by default).
  5. After the stream starts, it should try its hardest to yield as many items as possible, up to a provided threshold.

With all this in mind, this is how I envision an ideal streaming program to look:

# Wishful thinking: the new stream object is returned
#submission_stream = subreddit.stream.submissions()
#comment_stream = subreddit.stream.comments()
submission_stream = Stream(subreddit.new)
comment_stream = Stream(subreddit.comments)

@submission_stream.handler()
@comment_stream.handler()
def error_handler(exception):
    print('praw/stream related exception')

while True:
    try:
        for submission in submission_stream:
            if submission is None:
                break
            print(submission)

        for comment in comment_stream:
            if comment is None:
                break
            print(comment)

    except Exception:
        print('bot related exception')

If this looks any bit promising, please see and try out my Stream class draft here.

@jarhill0 jarhill0 added the Feature New feature or request label Feb 13, 2019
@bboe
Copy link
Member

bboe commented Feb 13, 2019

I love this suggestion. Correct me if I'm wrong, but wouldn't there be no need to put the iterators for submission and comment streams inside the try/except. E.g., would this work instead?

while True:
    for submission in submission_stream:
        if submission is None:
            break
        try:
            print(submission)
            ...
        except Exception:
            # submission processing exception

    for comment in comment_stream:
        if comment is None:
            break
        try:
            print(comment)
            ...
        except Exception:
            # comment processing exception

My only other concern has to do with the asynchronous nature of the approach. I personally like it, but I worry it might be a bit confusing to new-to-programming users. That said, perhaps how it handles exceptions makes it actually easier to those users, and only the users who care enough to do something with the exceptions need to figure it out.

Thoughts? Thanks for the suggestions.

@Pyprohly
Copy link
Contributor Author

Pyprohly commented Feb 14, 2019

Yes, there’s no longer a strong need to do exception handling on the outside of the iterators anymore, and it may make a little more sense to have the try/except on the inside of each stream loop. Where the exception handling written here isn’t too important… the way I see it, having such a generic try-catch at this level is simply to have a last ditch effort to keep the bot alive if something goes wrong. More localised exception handing is, of course, encouraged as soon as those sources can be identified.

I don’t think exception handling this high up should need to be shown in the docs anyway (to reduce distractions), but if so, I’m perfectly comfortable with either way you want to write it.
 

On one hand, the decision to do fetching in a separate thread isn’t completely necessary, because, when multi streaming, if one of the streams holds up the other then it’s likely the other will have fetch problems as well. But I like having this threading arrangement because it allows for a degree of control over how long (via wait_for parameter) the program spends in the iterator trying to fetch new items.

As far as I can tell, knowing that the fetch happens in parallel isn’t going to change how anyone would write things.

But one minor way I know this particular threading approach could make an impression is if the user has really bad internet and they decide to control-c the script. It could take a moment for the script to end, unless a second control-c is issued. A second control-c would end things immediately, but probably not on Python 2. On Python 2 you’d be forced to wait.

I’ve only test new Stream on 3.7. Aside from that difference mentioned (and also queue module is Queue in Python 2), I don’t expect too many changes in relation to the Stream class are needed to meet any backward compatibility requirements as far the Stream class goes (albeit integration into PRAW is another matter).
 

The new Stream is far from PR ready. There are lot of other decisions that need to be made in order to not break backwards compatibility. Among other things, what do we do with pause_after now that it’s not needed? What we do with pause_after I imagine is going to be contingent on how we decide to reintegrate ExponentialCounter. Otherwise, we could just offer the new streaming object via some other new method. It’s kind of ugly to have two streaming methods around though. Thoughts?

In other progress efforts, I’ve tried to tinker around with the limit and before parameters, with less than satisfactory outcomes. In the current stream_generator, the limit and before parameters are adjusted based on logic I can’t quite understand. When I tried to vary the value of these parameters I found I ended up missing some items when compared to stream_generator. Some guidance on how the the limit and before adjusting works in stream_generator would be great.

@bboe
Copy link
Member

bboe commented Feb 14, 2019

While I don't have time to fully respond I can respond to this statement:

Some guidance on how the the limit and before adjusting works in stream_generator would be great.

The only reason for limit is to attempt to break any caches. The thought being that adjusting that parameter will result in a fresh response. While I'm not looking at the code, I think it's especially important when the before parameter doesn't change because identical requests would otherwise be re-issued which might hit a cache.

In addition to narrowing in the before parameter for the purpose of breaking caches, using it means less data needs to be received when it works. Unfortunately, it isn't 100% reliable, as it's not possible to distinguish between an empty response meaning there are no more new items, or an empty response meaning that the item identified by the before parameter no longer existing in the listing of 1000 items. The item could have been deleted, removed, or simply fallen off the listing due to 1000 newer items. Hence why, I think, only a single request is made with the before parameter before changing to a request for the top-level resource and with a different limit parameter.

I hope that help explain it.

@Pyprohly
Copy link
Contributor Author

Pyprohly commented Feb 16, 2019

Thanks @bboe, that information has really helped.

So the idea essentially is to use limit to break any caches, while before can be used to reduce data consumption. Given those goals, I’ve now implemented much improved limit and before adjusting logic into new Stream.

limit adjusting

Current behaviour

A limit of 100 is always used, unless before is None, in which case limit will decrement towards 70 (and loop around). (before is None when the previous fetch resulted in no new items.)

New behaviour

To dissuade the potential for cached results, I’ve made the value of limit simply decrement on each request from 100 to 70, looping around.

Rationale

There’s a greater chance to break caches if we alter limit more frequently.

before adjusting

Current behaviour

The value of before is taken from the last seen item. If a fetch results in no items, then a value of None is used for before.

Some notes about this behaviour:

  • Since reddit listings aren’t always produced in chronological order, using the latest item for the value of before can occasionally lead to missing some items.

  • A value of None for before means that up to limit items (potentially already seen) will be produced. If a listing goes silent then this value may continue to be None for a while, in which case limit number of old items will continuously be fetched.

New behaviour

I’ve bound the value of before to a random item from 45 to 18 entries behind the last seen item, rather than using the last seen item. A random item is selected from this range each request. If the bot happens to be streaming a listing that has fewer than 45 items in it initially (i.e., a new subreddit) then None will always be used until 45 seen items can be achieved.

Rationale

[edit: outdated information in this section]

In the current system, if a listing goes silent then up to 100 (100 to 70) older items will still get pulled in from reddit. As a benchmark, the most inefficient approach, in terms of data consumption, would be to take 100 (old) items each time.

As explained by @bboe this is done this way because it’s very difficult to tell if an item has been deleted, removed, or fallen off the top 1000 listing due to new entries. A value of None for before will always produce the latest items.

To circumvent the potential that an item has been deleted or removed, after each request the stream picks a new random item from the last 45 to 18 seen entries and uses it for the value of before. One of those items in the range is bound to still exist in the top 1000 listing.

Dealing with the potential that all 45 to 18 last seen items have fallen off the top 1000 is another matter. Although this may seem like an extremely unlikely case, if the bot has a really bad connection and disconnects from the internet for a while, and it’s streaming a very active reddit listing, then there is a possibility that all those 45 to 18 items have fallen off the top 1000 listing by the time the bot reconnects.

Since we can’t assume the bot will always have a stable internet connection, to address with this problem, None will be used for the value of before in the next fetch request if an exception was thrown during the previous fetch.

Failing any of this, I’ve written in a 1/200 chance that None will be used for before, no matter what.

All this work should hopefully amount to about a 66% overall reduction in data consumption and unnecessary seen item checking.
 

Check out the updated gist.


concurrent.futures

I’ve realised that the concurrent.futures module is available only as a backport in Python 2.7. To ensure native compatibility, I’ve switched to using the threading module instead.

BoundedSet

Since I needed to index into BoundedSet, I’ve added indexing capabilities to it. Having made modifications, I figured I may as well complete the set interface on it too.

The constructor’s signature has changed but I made sure it’s backward compatible with the old BoundedSet.

By the way, I’ve added support for max_items=None which instantiates an unbounded BoundedSet, so there’s that.

@Pyprohly Pyprohly changed the title Current stream implementation is inconvenient Stream implementation is inconvenient Mar 2, 2019
@PythonCoderAS
Copy link
Contributor

I was thinking of adding an option to suppress errors, but then we come to the error where someone tries streaming, e.g. comments of a private subreddit they lost access to, which would just go into an infinite loop, so maybe we should have an internal counter of exceptions, and throw an exception if X amount of errors happen in Y seconds.

@isFakeAccount
Copy link
Contributor

Hi, I am having the same problem. The streams stop if an exception occurs and the workaround to create the stream again in the exception which is not a very good solution.

PRAW stream generator doesn't start after an exception occurs

@github-actions
Copy link

This issue is stale because it has been open for 20 days with no activity. Remove the Stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale Issue or pull request has been inactive for 20 days label May 20, 2021
@github-actions
Copy link

This issue was closed because it has been stale for 10 days with no activity.

@github-actions github-actions bot added the Auto-closed - Stale Automatically closed due to being stale for too long label May 31, 2021
@LilSpazJoekp LilSpazJoekp reopened this Sep 24, 2021
@LilSpazJoekp
Copy link
Member

I would like to revisit the suggested implementation. Is this something you'd still like to see merged in @Pyprohly?

@github-actions github-actions bot removed the Auto-closed - Stale Automatically closed due to being stale for too long label Sep 25, 2021
@github-actions
Copy link

github-actions bot commented Oct 5, 2021

This issue was closed because it has been stale for 10 days with no activity.

@github-actions github-actions bot added the Auto-closed - Stale Automatically closed due to being stale for too long label Oct 5, 2021
@github-actions github-actions bot closed this as completed Oct 5, 2021
@LilSpazJoekp LilSpazJoekp reopened this Oct 5, 2021
@LilSpazJoekp LilSpazJoekp removed Stale Issue or pull request has been inactive for 20 days Auto-closed - Stale Automatically closed due to being stale for too long labels Oct 5, 2021
@github-actions
Copy link

This issue is stale because it has been open for 20 days with no activity. Remove the Stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale Issue or pull request has been inactive for 20 days label Oct 25, 2021
@LilSpazJoekp LilSpazJoekp removed the Stale Issue or pull request has been inactive for 20 days label Oct 25, 2021
@github-actions
Copy link

This issue is stale because it has been open for 20 days with no activity. Remove the Stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale Issue or pull request has been inactive for 20 days label Nov 14, 2021
@LilSpazJoekp LilSpazJoekp added Discussion Open for discussion and removed Stale Issue or pull request has been inactive for 20 days labels Nov 14, 2021
@github-actions
Copy link

github-actions bot commented Dec 5, 2021

This issue is stale because it has been open for 20 days with no activity. Remove the Stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale Issue or pull request has been inactive for 20 days label Dec 5, 2021
@LilSpazJoekp LilSpazJoekp removed the Stale Issue or pull request has been inactive for 20 days label Dec 5, 2021
@github-actions
Copy link

This issue is stale because it has been open for 20 days with no activity. Remove the Stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale Issue or pull request has been inactive for 20 days label Dec 25, 2021
@LilSpazJoekp LilSpazJoekp removed the Stale Issue or pull request has been inactive for 20 days label Dec 25, 2021
@github-actions
Copy link

This issue is stale because it has been open for 20 days with no activity. Remove the Stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale Issue or pull request has been inactive for 20 days label Jan 14, 2022
@LilSpazJoekp LilSpazJoekp removed the Stale Issue or pull request has been inactive for 20 days label Jan 14, 2022
@github-actions
Copy link

github-actions bot commented Feb 3, 2022

This issue is stale because it has been open for 20 days with no activity. Remove the Stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale Issue or pull request has been inactive for 20 days label Feb 3, 2022
@LilSpazJoekp LilSpazJoekp removed the Stale Issue or pull request has been inactive for 20 days label Feb 4, 2022
@oussama-gourari
Copy link

oussama-gourari commented May 8, 2024

the way I am handling exceptions in my bots is by catching them before they propagate to the stream, and this by monkey patching the prawcore.sessions.Session.request method, this allows to resume the stream without having to recreate it again, I do this at the top of the script:

import time

from prawcore.exceptions import BadJSON, RequestException, ServerError, TooManyRequests
from prawcore.sessions import Session


original_session_request = Session.request


def patched_session_request(*args, **kwargs) -> Any:
    while True:
        try:
            return original_session_request(*args, **kwargs)
        except (RequestException, TooManyRequests, BadJSON, ServerError) as e:
            print(e)
            time.sleep(5)


Session.request = patched_session_request

@umutbozdag
Copy link

I have just created my own stream using .new method. I can share the example code if you still need it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Discussion Open for discussion Feature New feature or request
Projects
None yet
Development

No branches or pull requests

8 participants