Skip to content

Commit

Permalink
Changes to Job model to handle RQ better
Browse files Browse the repository at this point in the history
  • Loading branch information
DanSheps committed Jan 12, 2022
1 parent 5c461d7 commit d9383d7
Showing 1 changed file with 53 additions and 35 deletions.
88 changes: 53 additions & 35 deletions netbox_config_backup/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ def __str__(self):

def delete(self, using=None, keep_parents=False):
queue = get_queue('netbox_config_backup.jobs')
jobs = queue.scheduled_job_registry.get_job_ids()
for job in jobs:
queue.scheduled_job_registry.remove(job)

queue.fetch_job(f'{self.job_id}').cancel()
queue.fetch_job(f'{self.job_id}').remove()

super().delete(using=using, keep_parents=keep_parents)

Expand Down Expand Up @@ -145,60 +145,78 @@ def needs_enqueue(cls, backup, job_id=None):
backup.device.platform.napalm_driver == '' or backup.device.platform.napalm_driver is None:
return False

jobs = cls.objects.filter(backup=backup)
queued = jobs.filter(status__in=[JobResultStatusChoices.STATUS_RUNNING, JobResultStatusChoices.STATUS_PENDING])
if job_id is not None:
queued = queued.exclude(job_id=job_id)

if queued.count() > 0:
for job in queued.all():
found_job_id = f'{job.job_id}'
if queue.fetch_job(f'{job.job_id}') is not None and (f'{job.job_id}' in scheduled_jobs or
f'{job.job_id}' in started_jobs):
return False
if cls.is_queued(backup, job_id):
return False

return True

@classmethod
def is_running(cls, backup):
def is_running(cls, backup, job_id=None):
queue = get_queue('netbox_config_backup.jobs')
registry = StartedJobRegistry(queue=queue)

jobs = cls.objects.filter(backup=backup)
queued = jobs.filter(status__in=[JobResultStatusChoices.STATUS_RUNNING, JobResultStatusChoices.STATUS_PENDING])
if queued.count() > 0:
for job in queued.all():
if queue.fetch_job(f'{job.job_id}') is not None:
return True
for job_id in registry.get_job_ids():
job = queue.fetch_job(f'{job_id}')
if job.description == f'backup-{backup.device.pk}':

if job_id is not None:
queued.exclude(job_id=job_id)

for backupjob in queued.all():
job = queue.fetch_job(f'{backupjob.job_id}')
if job and job.is_started and job.id in queue.started_job_registry.get_job_ids():
return True
elif job and job.is_started and job.id not in queue.started_job_registry.get_job_ids():
job.cancel()
backupjob.status = JobResultStatusChoices.STATUS_FAILED
backupjob.save()
logger.warning(f'Job in queue but not in a registry, cancelling')
elif job and job.is_canceled:
backupjob.status = JobResultStatusChoices.STATUS_FAILED
backupjob.save()
return False

@classmethod
def is_queued(cls, backup):
def is_queued(cls, backup, job_id=None):
queue = get_queue('netbox_config_backup.jobs')
registry = ScheduledJobRegistry(queue=queue)

scheduled_jobs = queue.scheduled_job_registry.get_job_ids()
started_jobs = queue.started_job_registry.get_job_ids()

jobs = cls.objects.filter(backup=backup)
queued = jobs.filter(status__in=[JobResultStatusChoices.STATUS_RUNNING, JobResultStatusChoices.STATUS_PENDING])
if queued.count() > 0:
for job in queued.all():
if queue.fetch_job(f'{job.job_id}') is not None:

if job_id is not None:
queued.exclude(job_id=job_id)

for backupjob in queued.all():
job = queue.fetch_job(f'{backupjob.job_id}')
if job and (job.is_scheduled or job.is_queued) and job.id in scheduled_jobs + started_jobs:
return True
for job_id in registry.get_job_ids():
job = queue.fetch_job(f'{job_id}')
if backup.device == None:
return False
if job.description == f'backup-{backup.device.pk}':
return True
elif job and (job.is_scheduled or job.is_queued) and job.id not in scheduled_jobs + started_jobs:
job.cancel()
backupjob.status = JobResultStatusChoices.STATUS_FAILED
backupjob.save()
logger.warning(f'Job in queue but not in a registry, cancelling')
elif job and job.is_canceled:
backupjob.status = JobResultStatusChoices.STATUS_FAILED
backupjob.save()
return False

@classmethod
def remove_orphaned(cls):
queue = get_queue('netbox_config_backup.jobs')
registry = ScheduledJobRegistry(queue=queue)

for job_id in registry.get_job_ids():
try:
BackupJob.objects.get(job_id=job_id)
except BackupJob.DoesNotExist:
registry.remove(job_id)

@classmethod
def remove_queued(cls, backup):
queue = get_queue('netbox_config_backup.jobs')
registry = ScheduledJobRegistry(queue=queue)
for job_id in registry.get_job_ids():
job = queue.fetch_job(f'{job.job_id}')
job = queue.fetch_job(f'{job_id}')
if backup.device is not None and job.description == f'backup-{backup.device.pk}':
registry.remove(f'{job_id}')

0 comments on commit d9383d7

Please sign in to comment.