-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding recurring tasks? #63
Comments
You could use a cronjob to add items to the queue. You can then either use |
Thanks for your quick answers 👍 ! @stas This is the solution I went for 🙈 ... Not very proud of it but it seems to work well. I am unsure whether timezones are well respected with my code though 🤔 . import datetime
import pytz
from croniter import croniter # pip install croniter
from loguru import logger # pip install loguru
def requeue_job(queue, job, delete_job=False, engine=None):
"""
Requeues a job created with the use of the library pq.
This is a workaround for making recurring tasks.
The job must fullfill the following conditions:
* have a key called "cron" with a valid cronjob expression in its data
* have a datetime for its schedule_at attribute
**WARNING**: we assume the timezone of the datetime is UTC (pq does not
seem to save the TZ information in the attribute schedule_at).
Parameters
----------
queue : pq.Queue
job : pq.Job
delete_job : bool, default False
Whether to delete the job that's been passed (happens after requeuing it).
If True, an engine must be provided.
engine : sqlalchemy.engine.base.Engine or None, default None
Needed if delete_job is True
Examples
--------
>>> import datetime
>>>
>>> # let's assume you have defined a queue somewhere
>>> queue.put(data={'url':'https://www.github.com', 'cron':'0 2 * * *'},
... schedule_at=datetime.datetime.now().astimezone(datetime.timezone.utc))
>>>
>>> job = queue.get()
>>> # do something...
>>> requeue_job(queue=queue, job=job)
"""
# verify data
## type
data = job.data
if not isinstance(data, dict):
raise TypeError(f'Expecting job.data to be dict. Received type {type(data)} instead')
## content
if 'cron' not in data:
raise ValueError('Key "cron" is missing from job.data')
if not job.schedule_at:
raise ValueError('schedule_at must have been provided for the job (it is None)!')
# make sure schedule_at is timezoned
schedule_at = job.schedule_at
if schedule_at.tzinfo is None:
schedule_at = pytz.utc.localize(schedule_at)
# get next schedule based on cron expression
cron, start_time = data['cron'], job.schedule_at
iter = croniter(expr_format=cron, start_time=start_time)
schedule_at = iter.get_next(datetime.datetime)
logger.debug(f'Requeuing job {job.id}. Next execution: {schedule_at} (base_time: {start_time}, cron: {cron})')
# requeue job
new_job_id = queue.put(job.data, schedule_at=schedule_at)
# delete job
if delete_job:
if not engine:
raise ValueError('I need a sqlalchemy engine to drop jobs! You have not provided any')
engine.execute(f'DELETE FROM "{queue.name}" WHERE id=%(job_id)s;', job_id=job.id)
return new_job_id |
I think this is a nice solution for the case where you want to schedule a subsequent run only if the queue item is being "worked". |
Hello,
is it possible to add recurring tasks similarly to a cronjob? I scrolled quickly through the issues and the source code and could not find any indication for that. If this functionality does not exist I'd be interested in easy workaround ideas 🤔 .
The use case is a queue of URLs to be crawled from multiple computers regularly (e.g. everyday at 1 AM). To make things more difficult all URLs are crawled at different times (some at 1 AM, others at 2 AM etc.).
It seems this was implemented in the django-pq library but it is unmaintained and there has not been a commit since 2014. Also your library has been very reliable for me so it'd be nice to use it for this as well 👍.
Specifications
The text was updated successfully, but these errors were encountered: