diff --git a/netbox_config_backup/models/jobs.py b/netbox_config_backup/models/jobs.py index fb51651..1203e6d 100644 --- a/netbox_config_backup/models/jobs.py +++ b/netbox_config_backup/models/jobs.py @@ -57,9 +57,9 @@ def __str__(self): def delete(self, using=None, keep_parents=False): queue = get_queue('netbox_config_backup.jobs') - jobs = queue.scheduled_job_registry.get_job_ids() - for job in jobs: - queue.scheduled_job_registry.remove(job) + + queue.fetch_job(f'{self.job_id}').cancel() + queue.fetch_job(f'{self.job_id}').remove() super().delete(using=using, keep_parents=keep_parents) @@ -145,60 +145,78 @@ def needs_enqueue(cls, backup, job_id=None): backup.device.platform.napalm_driver == '' or backup.device.platform.napalm_driver is None: return False - jobs = cls.objects.filter(backup=backup) - queued = jobs.filter(status__in=[JobResultStatusChoices.STATUS_RUNNING, JobResultStatusChoices.STATUS_PENDING]) - if job_id is not None: - queued = queued.exclude(job_id=job_id) - - if queued.count() > 0: - for job in queued.all(): - found_job_id = f'{job.job_id}' - if queue.fetch_job(f'{job.job_id}') is not None and (f'{job.job_id}' in scheduled_jobs or - f'{job.job_id}' in started_jobs): - return False + if cls.is_queued(backup, job_id): + return False + return True @classmethod - def is_running(cls, backup): + def is_running(cls, backup, job_id=None): queue = get_queue('netbox_config_backup.jobs') - registry = StartedJobRegistry(queue=queue) jobs = cls.objects.filter(backup=backup) queued = jobs.filter(status__in=[JobResultStatusChoices.STATUS_RUNNING, JobResultStatusChoices.STATUS_PENDING]) - if queued.count() > 0: - for job in queued.all(): - if queue.fetch_job(f'{job.job_id}') is not None: - return True - for job_id in registry.get_job_ids(): - job = queue.fetch_job(f'{job_id}') - if job.description == f'backup-{backup.device.pk}': + + if job_id is not None: + queued.exclude(job_id=job_id) + + for backupjob in queued.all(): + job = queue.fetch_job(f'{backupjob.job_id}') + if job and job.is_started and job.id in queue.started_job_registry.get_job_ids(): return True + elif job and job.is_started and job.id not in queue.started_job_registry.get_job_ids(): + job.cancel() + backupjob.status = JobResultStatusChoices.STATUS_FAILED + backupjob.save() + logger.warning(f'Job in queue but not in a registry, cancelling') + elif job and job.is_canceled: + backupjob.status = JobResultStatusChoices.STATUS_FAILED + backupjob.save() return False @classmethod - def is_queued(cls, backup): + def is_queued(cls, backup, job_id=None): queue = get_queue('netbox_config_backup.jobs') - registry = ScheduledJobRegistry(queue=queue) + + scheduled_jobs = queue.scheduled_job_registry.get_job_ids() + started_jobs = queue.started_job_registry.get_job_ids() jobs = cls.objects.filter(backup=backup) queued = jobs.filter(status__in=[JobResultStatusChoices.STATUS_RUNNING, JobResultStatusChoices.STATUS_PENDING]) - if queued.count() > 0: - for job in queued.all(): - if queue.fetch_job(f'{job.job_id}') is not None: + + if job_id is not None: + queued.exclude(job_id=job_id) + + for backupjob in queued.all(): + job = queue.fetch_job(f'{backupjob.job_id}') + if job and (job.is_scheduled or job.is_queued) and job.id in scheduled_jobs + started_jobs: return True - for job_id in registry.get_job_ids(): - job = queue.fetch_job(f'{job_id}') - if backup.device == None: - return False - if job.description == f'backup-{backup.device.pk}': - return True + elif job and (job.is_scheduled or job.is_queued) and job.id not in scheduled_jobs + started_jobs: + job.cancel() + backupjob.status = JobResultStatusChoices.STATUS_FAILED + backupjob.save() + logger.warning(f'Job in queue but not in a registry, cancelling') + elif job and job.is_canceled: + backupjob.status = JobResultStatusChoices.STATUS_FAILED + backupjob.save() return False + @classmethod + def remove_orphaned(cls): + queue = get_queue('netbox_config_backup.jobs') + registry = ScheduledJobRegistry(queue=queue) + + for job_id in registry.get_job_ids(): + try: + BackupJob.objects.get(job_id=job_id) + except BackupJob.DoesNotExist: + registry.remove(job_id) + @classmethod def remove_queued(cls, backup): queue = get_queue('netbox_config_backup.jobs') registry = ScheduledJobRegistry(queue=queue) for job_id in registry.get_job_ids(): - job = queue.fetch_job(f'{job.job_id}') + job = queue.fetch_job(f'{job_id}') if backup.device is not None and job.description == f'backup-{backup.device.pk}': registry.remove(f'{job_id}') \ No newline at end of file