Skip to content

Commit

Permalink
Merge pull request #105 from DanSheps/develop
Browse files Browse the repository at this point in the history
New backup method
  • Loading branch information
DanSheps authored Oct 2, 2024
2 parents eb0641c + a89d802 commit bab90fd
Show file tree
Hide file tree
Showing 30 changed files with 582 additions and 543 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:

steps:
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

Expand Down
10 changes: 10 additions & 0 deletions netbox_config_backup/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,15 @@ class NetboxConfigBackup(PluginConfig):
]
graphql_schema = 'graphql.schema.schema'

def ready(self, *args, **kwargs):
super().ready()
import sys
if 'rqworker' in sys.argv[1]:
from netbox import settings
from netbox_config_backup.jobs.backup import BackupRunner
from netbox_config_backup.models import BackupJob, Backup
frequency = settings.PLUGINS_CONFIG.get('netbox_config_backup', {}).get('frequency') / 60
BackupRunner.enqueue_once(interval=frequency)


config = NetboxConfigBackup
Empty file.
116 changes: 116 additions & 0 deletions netbox_config_backup/backup/processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import logging
import os
import time
import traceback

from django.db.models import Q
from django.utils import timezone

from core.choices import JobStatusChoices
from netbox.api.exceptions import ServiceUnavailable
from netbox_config_backup.models import BackupJob, Backup
from netbox_config_backup.utils.db import close_db
from netbox_config_backup.utils.configs import check_config_save_status
from netbox_config_backup.utils.napalm import napalm_init
from netbox_config_backup.utils.rq import can_backup

logger = logging.getLogger(f"netbox_config_backup")


def remove_stale_backupjobs(job: BackupJob):
pass

def run_backup(job_id):
close_db()
logger.info(f'Starting backup for job {job_id}')
try:
job = BackupJob.objects.get(pk=job_id)
except Exception as e:
logger.error(f'Unable to load job {job_id}: {e}')
logger.debug(f'\t{traceback.format_exc()}')
raise e

try:
backup = Backup.objects.get(pk=job.backup.pk)
backup.refresh_from_db()
pid = os.getpid()

job.status = JobStatusChoices.STATUS_PENDING
job.pid = pid
job.save()

if not can_backup(backup):
job.status = JobStatusChoices.STATUS_FAILED
if not job.data:
job.data = {}
job.data.update({'error': f'Cannot backup {backup}'})
job.full_clean()
job.save()
logger.warning(f'Cannot backup {backup}')
return

commit = None
try:
ip = backup.ip if backup.ip is not None else backup.device.primary_ip
except Exception as e:
logger.debug(f'{e}: {backup}')
raise e

if ip:
try:
d = napalm_init(backup.device, ip)
except (TimeoutError, ServiceUnavailable):
job.status = JobStatusChoices.STATUS_FAILED
job.data = {'error': f'Timeout Connecting to {backup.device} with ip {ip}'}
logger.debug = f'Timeout Connecting to {backup.device} with ip {ip}'
job.save()
return

job.status = JobStatusChoices.STATUS_RUNNING
job.started = timezone.now()
job.save()
try:
status = check_config_save_status(d)
if status is not None:
if status and not backup.config_status:
backup.config_status = status
backup.save()
elif not status and backup.config_status:
backup.config_status = status
backup.save()
elif not status and backup.config_status is None:
backup.config_status = status
backup.save()
elif status and backup.config_status is None:
backup.config_status = status
backup.save()
except Exception as e:
logger.error(f'{backup}: had error setting backup status: {e}')

configs = d.get_config()
commit = backup.set_config(configs)

d.close()
logger.info(f'{backup}: Backup complete')
job.status = JobStatusChoices.STATUS_COMPLETED
job.completed = timezone.now()
job.save()
remove_stale_backupjobs(job=job)
else:
job.status = JobStatusChoices.STATUS_FAILED
if not job.data:
job.data = {}
job.data.update({'error': f'{backup}: No IP set'})
job.full_clean()
job.save()
logger.debug(f'{backup}: No IP set')
except Exception as e:
logger.error(f'Exception in {job_id}: {e}')
logger.info(f'\t{traceback.format_exc()}')
if job:
job.status = JobStatusChoices.STATUS_ERRORED
if not job.data:
job.data = {}
job.data.update({'error': f'{e}'})
job.full_clean()
job.save()
12 changes: 12 additions & 0 deletions netbox_config_backup/filtersets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.utils.translation import gettext as _
from netaddr import AddrFormatError

from core.choices import JobStatusChoices
from ipam.models import IPAddress
from netbox.filtersets import NetBoxModelFilterSet, BaseFilterSet
from dcim.models import Device
Expand All @@ -13,6 +14,17 @@
from utilities.filters import MultiValueCharFilter


class BackupJobFilterSet(BaseFilterSet):
q = django_filters.CharFilter(
method='search',
label=_('Search'),
)

class Meta:
model = models.BackupJob
fields = ['id', 'status']


class BackupFilterSet(BaseFilterSet):
q = django_filters.CharFilter(
method='search',
Expand Down
17 changes: 16 additions & 1 deletion netbox_config_backup/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
from django.forms import CharField
from django.utils.translation import gettext as _

from core.choices import JobStatusChoices
from dcim.choices import DeviceStatusChoices
from dcim.models import Device
from ipam.models import IPAddress
from netbox.forms import NetBoxModelForm, NetBoxModelBulkEditForm
from netbox_config_backup.models import Backup
from netbox_config_backup.models import Backup, BackupJob
from utilities.forms import add_blank_choice
from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField, CommentField

__all__ = (
'BackupForm',
'BackupJobFilterSetForm',
'BackupFilterSetForm',
'BackupBulkEditForm',
)
Expand Down Expand Up @@ -59,6 +62,18 @@ def clean(self):
raise ValidationError({'device': f'{device}\'s platform ({device.platform}) has no napalm driver'})


class BackupJobFilterSetForm(forms.Form):
model = BackupJob
field_order = [
'q', 'status',
]
status = forms.MultipleChoiceField(
required=False,
choices=add_blank_choice(JobStatusChoices),
label=_('Status')
)


class BackupFilterSetForm(forms.Form):
model = Backup
field_order = [
Expand Down
Empty file.
149 changes: 149 additions & 0 deletions netbox_config_backup/jobs/backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import logging
import time
import uuid
import traceback
from datetime import timedelta
from multiprocessing import Process

from django.db.models import Q
from django.utils import timezone
from rq.job import JobStatus

from core.choices import JobStatusChoices
from netbox.jobs import JobRunner
from netbox_config_backup.backup.processing import run_backup
from netbox_config_backup.choices import StatusChoices
from netbox_config_backup.models import Backup, BackupJob
from netbox_config_backup.utils.db import close_db
from netbox_config_backup.utils.rq import can_backup

logger = logging.getLogger(f"netbox_config_backup")


class SchedulerRunner(JobRunner):
class Meta:
name = "The scheduler"




class BackupRunner(JobRunner):
processes = {}

class Meta:
name = 'The Backup Job Runner'

def clean_stale_jobs(self):
jobs = BackupJob.objects.order_by('created').filter(
status=JobStatusChoices.ENQUEUED_STATE_CHOICES,
).prefetch_related('device')
scheduled = jobs.filter(status=JobStatusChoices.STATUS_SCHEDULED)
stale = jobs.filter(scheduled__lt=timezone.now() - timedelta(minutes=30))

for job in stale:
if job.pid:
pass
job.status = JobStatusChoices.STATUS_ERRORED
if not job.data:
job.data = {}
job.data.update({'error': 'Job hung'})
job.save()
job.refresh_from_db()
logger.warning(f'Job {job.backup} appears stuck, deleting')

for job in scheduled:
if job != scheduled.filter(backup=job.backup).last():
job.status = JobStatusChoices.STATUS_FAILED
if not job.data:
job.data = {}
job.data.update({'error': 'Process terminated'})
job.save()

def schedule_jobs(self):
backups = Backup.objects.filter(status=StatusChoices.STATUS_ACTIVE, device__isnull=False)
for backup in backups:
if can_backup(backup):
logger.debug(f'Queuing device {backup.device} for backup')
jobs = BackupJob.objects.filter(backup=backup, status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES)
job = jobs.last()
if job is not None:
job.runner = self.job
job.status = JobStatusChoices.STATUS_SCHEDULED
job.scheduled = timezone.now()
job.save()
else:
job = BackupJob(
runner=self.job,
backup=backup,
status=JobStatusChoices.STATUS_SCHEDULED,
scheduled=timezone.now(),
job_id=uuid.uuid4(),
data={},
)
job.full_clean()
job.save()
else:
jobs = BackupJob.objects.filter(backup=backup, status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES)
for job in jobs:
job.status = JobStatusChoices.STATUS_FAILED
if not job.data:
job.data = {}
job.data.update({'error': f'Cannot queue job'})
job.save()

def run_processes(self):
for job in BackupJob.objects.filter(status=JobStatusChoices.STATUS_SCHEDULED):
try:
process = self.fork_process(job)
process.join(1)
except Exception as e:
job.status = JobStatusChoices.STATUS_FAILED
job.data['error'] = str(e)
job.save()

def fork_process(self, job):
close_db()
process = Process(target=run_backup, args=(job.pk, ), )
data = {
job.backup.pk: {
'process': process,
'backup': job.backup.pk,
'job': job.pk
}
}
self.processes.update(data)
process.start()
logger.debug(f'Forking process {process.pid} for {job.backup} backup')
return process

def handle_processes(self):
close_db()
for pk in list(self.processes.keys()):
process = self.processes.get(pk, {}).get('process')
job_pk = self.processes.get(pk, {}).get('job')
backup = self.processes.get(pk, {}).get('backup')
if not process.is_alive():
logger.debug(f'Terminating process {process.pid} with job pk of {pk} for {backup}')
process.terminate()
del self.processes[pk]
job = BackupJob.objects.filter(pk=job_pk).first()
if job and job.status != JobStatusChoices.STATUS_COMPLETED:
job.status = JobStatusChoices.STATUS_ERRORED
if not job.data:
job.data = {}
job.data.update({'error': 'Process terminated'})
job.save()

def run(self, *args, **kwargs):
try:
self.clean_stale_jobs()
self.schedule_jobs()
self.run_processes()
while(True):
self.handle_processes()
if len(self.processes) == 0:
return
time.sleep(1)
except Exception as e:
logger.warning(f'{traceback.format_exc()}')
logger.error(f'{e}')
12 changes: 0 additions & 12 deletions netbox_config_backup/management/commands/enqueue_if_needed.py

This file was deleted.

Loading

0 comments on commit bab90fd

Please sign in to comment.