Skip to content

Commit

Permalink
job: refactor args
Browse files Browse the repository at this point in the history
* make `since` a predefined argument
* make extra args optional
  • Loading branch information
ntarocco committed Oct 10, 2024
1 parent 118e2de commit 071c3cb
Showing 1 changed file with 29 additions and 11 deletions.
40 changes: 29 additions & 11 deletions invenio_jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@

"""Jobs module."""

from abc import ABC, abstractmethod
from abc import ABC

from marshmallow import fields


class JobType(ABC):
"""Base class to register celery tasks available in the admin panel."""
"""Base class to define a job."""

arguments_schema = None
task = None
id = None
title = None
description = None

task = None

arguments_schema = {
"since": fields.DateTime(required=False),
}

@classmethod
def create(
cls, job_cls_name, arguments_schema, id_, task, description, title, attrs=None
Expand All @@ -39,17 +45,29 @@ def create(
),
)

@abstractmethod
def default_args(self, *args, **kwargs):
"""Abstract method to enforce implementing default arguments."""
@classmethod
def get_task_args(cls, job_obj, **kwargs):
"""Override to define extra arguments to be injected on task execution.
:param job_obj (Job): the Job object.
:return: a dict of arguments to be injected on task execution.
"""
return {}

@classmethod
def build_task_arguments(cls, job_obj, custom_args=None, **kwargs):
"""Build arguments to be passed to the task.
def build_task_arguments(cls, job_obj, since=None, custom_args=None, **kwargs):
"""Build dict of arguments injected on task execution.
Custom arguments can be passed to overwrite the default arguments of a job.
:param job_obj (Job): the Job object.
:param since (datetime): last time the job was executed.
:param custom_args (dict): when provided, takes precedence over any other
provided argument.
:return: a dict of arguments to be injected on task execution.
"""
if custom_args:
return custom_args
return cls.default_args(job_obj, **kwargs)

# `since` is a predefined argument
if since is None and job_obj.last_runs["success"]:
since = job_obj.last_runs["success"].started_at
return {"since": since, **cls.get_task_args(job_obj, **kwargs)}

0 comments on commit 071c3cb

Please sign in to comment.