Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding recurring tasks? #63

Open
ThibTrip opened this issue Feb 17, 2021 · 4 comments
Open

Adding recurring tasks? #63

ThibTrip opened this issue Feb 17, 2021 · 4 comments

Comments

@ThibTrip
Copy link

Hello,

is it possible to add recurring tasks similarly to a cronjob? I scrolled quickly through the issues and the source code and could not find any indication for that. If this functionality does not exist I'd be interested in easy workaround ideas 🤔 .

The use case is a queue of URLs to be crawled from multiple computers regularly (e.g. everyday at 1 AM). To make things more difficult all URLs are crawled at different times (some at 1 AM, others at 2 AM etc.).

It seems this was implemented in the django-pq library but it is unmaintained and there has not been a commit since 2014. Also your library has been very reliable for me so it'd be nice to use it for this as well 👍.

Specifications

  • Version: 1.9.0
  • Python version: 3.8.3
@malthe
Copy link
Owner

malthe commented Feb 18, 2021

You could use a cronjob to add items to the queue. You can then either use schedule_at to defer execution to a later time or just rely on the cronjob schedule to make the queue item available for work.

@stas
Copy link
Collaborator

stas commented Feb 18, 2021

@ThibTrip I second @malthe here

Add the logic to self-schedule a job in the future, pq will take care of it (if your job does too many things, create a dispatcher worker). 🙌

@ThibTrip
Copy link
Author

ThibTrip commented Feb 18, 2021

Thanks for your quick answers 👍 !

@stas This is the solution I went for 🙈 ... Not very proud of it but it seems to work well. I am unsure whether timezones are well respected with my code though 🤔 . pq does not seem to save the timezone for schedule_at (because I always provide schedule_at as a datetime with UTC timezone).

import datetime
import pytz
from croniter import croniter # pip install croniter
from loguru import logger # pip install loguru

def requeue_job(queue, job, delete_job=False, engine=None):
    """
    Requeues a job created with the use of the library pq.
    This is a workaround for making recurring tasks.

    The job must fullfill the following conditions:
    * have a key called "cron" with a valid cronjob expression in its data
    * have a datetime for its schedule_at attribute

    **WARNING**: we assume the timezone of the datetime is UTC (pq does not
    seem to save the TZ information in the attribute schedule_at).

    Parameters
    ----------
    queue : pq.Queue
    job : pq.Job
    delete_job : bool, default False
        Whether to delete the job that's been passed (happens after requeuing it).
        If True, an engine must be provided.
    engine : sqlalchemy.engine.base.Engine or None, default None
        Needed if delete_job is True

    Examples
    --------
    >>> import datetime
    >>>
    >>> # let's assume you have defined a queue somewhere
    >>> queue.put(data={'url':'https://www.github.com', 'cron':'0 2 * * *'},
    ...           schedule_at=datetime.datetime.now().astimezone(datetime.timezone.utc))
    >>>
    >>> job = queue.get()
    >>> # do something...
    >>> requeue_job(queue=queue, job=job)
    """
    # verify data
    ## type
    data = job.data
    if not isinstance(data, dict):
        raise TypeError(f'Expecting job.data to be dict. Received type {type(data)} instead')
    ## content
    if 'cron' not in data:
        raise ValueError('Key "cron" is missing from job.data')
    if not job.schedule_at:
        raise ValueError('schedule_at must have been provided for the job (it is None)!')

    # make sure schedule_at is timezoned
    schedule_at = job.schedule_at
    if schedule_at.tzinfo is None:
        schedule_at = pytz.utc.localize(schedule_at)

    # get next schedule based on cron expression
    cron, start_time = data['cron'], job.schedule_at
    iter = croniter(expr_format=cron, start_time=start_time)
    schedule_at = iter.get_next(datetime.datetime)
    logger.debug(f'Requeuing job {job.id}. Next execution: {schedule_at} (base_time: {start_time}, cron: {cron})')

    # requeue job
    new_job_id = queue.put(job.data, schedule_at=schedule_at)

    # delete job
    if delete_job:
        if not engine:
            raise ValueError('I need a sqlalchemy engine to drop jobs! You have not provided any')
        engine.execute(f'DELETE FROM "{queue.name}" WHERE id=%(job_id)s;', job_id=job.id)
    return new_job_id

@malthe
Copy link
Owner

malthe commented Feb 18, 2021

I think this is a nice solution for the case where you want to schedule a subsequent run only if the queue item is being "worked".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants