Skip to content

Commit

Permalink
improved celery time limit handling and minor bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
MarvinDo committed Aug 16, 2024
1 parent 64793cf commit 6fbfe74
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/common/db_IO.py
Original file line number Diff line number Diff line change
Expand Up @@ -3760,7 +3760,7 @@ def get_heredicare_queue_entries(self, publish_queue_ids: list, variant_id):

def insert_publish_request(self, user_id: int, upload_heredicare: bool, upload_clinvar: bool, variant_ids: list):
command = "INSERT INTO publish_queue (user_id, upload_clinvar, upload_heredicare, variant_ids) VALUES (%s, %s, %s, %s)"
self.cursor.execute(command, (user_id, upload_clinvar, upload_heredicare, ";".join(variant_ids)))
self.cursor.execute(command, (user_id, upload_clinvar, upload_heredicare, ";".join([str(x) for x in variant_ids])))
self.conn.commit()
publish_queue_id = self.get_last_insert_id()
return publish_queue_id
Expand Down
8 changes: 4 additions & 4 deletions src/frontend_celery/webapp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def start_variant_import(vids, user_id, user_roles, conn: Connection): # starts

return import_queue_id

@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=6000)
@celery.task(bind=True, retry_backoff=5, max_retries=3, soft_time_limit=6000)
def heredicare_variant_import(self, vids, user_id, user_roles, import_queue_id):
"""Background task for fetching variants from HerediCare"""
self.update_state(state='PROGRESS')
Expand Down Expand Up @@ -205,7 +205,7 @@ def retry_variant_import(import_variant_queue_id, user_id, user_roles, conn: Con
# this uses exponential backoff in case there is a http error
# this will retry 3 times before giving up
# first retry after 5 seconds, second after 25 seconds, third after 125 seconds (if task queue is empty that is)
@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600)
@celery.task(bind=True, retry_backoff=5, max_retries=3, soft_time_limit=600)
def import_one_variant_heredicare(self, vid, user_id, user_roles, import_variant_queue_id):
"""Background task for fetching variants from HerediCare"""
self.update_state(state='PROGRESS')
Expand Down Expand Up @@ -738,7 +738,7 @@ def validate_and_insert_cnv(chrom: str, start: int, end: int, sv_type: str, impr
# this uses exponential backoff in case there is a http error
# this will retry 3 times before giving up
# first retry after 5 seconds, second after 25 seconds, third after 125 seconds (if task queue is empty that is)
@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600)
@celery.task(bind=True, retry_backoff=5, max_retries=3, soft_time_limit=600)
def annotate_all_variants(self, variant_ids, selected_job_config, user_id, roles):
"""Background task for running the annotation service"""
conn = Connection(roles)
Expand All @@ -764,7 +764,7 @@ def start_annotation_service(variant_id, user_id, conn: Connection, job_config =
# this uses exponential backoff in case there is a http error
# this will retry 3 times before giving up
# first retry after 5 seconds, second after 25 seconds, third after 125 seconds (if task queue is empty that is)
@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=6000)
@celery.task(bind=True, retry_backoff=5, max_retries=3, soft_time_limit=6000)
def annotate_variant(self, annotation_queue_id, job_config):
"""Background task for running the annotation service"""
self.update_state(state='PROGRESS')
Expand Down
10 changes: 5 additions & 5 deletions src/frontend_celery/webapp/upload/upload_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
from celery.exceptions import Ignore
from werkzeug.exceptions import abort
import traceback

from celery.exceptions import SoftTimeLimitExceeded



def start_publish(variant_ids, options, user_id, user_roles, conn: Connection):
upload_heredicare = options["do_heredicare"]
upload_clinvar = options["do_clinvar"]
publish_queue_id = conn.insert_publish_request(user_id, upload_heredicare, upload_clinvar, variant_ids)

task = publish.apply_async(args = [publish_queue_id, variant_ids, options, user_roles])
task_id = task.id

Expand All @@ -32,7 +32,7 @@ def start_publish(variant_ids, options, user_id, user_roles, conn: Connection):



@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600)
@celery.task(bind=True, retry_backoff=5, max_retries=3, soft_time_limit=6000)
def publish(self, publish_queue_id, variant_ids, options, user_roles):
"""Background task for adding all tasks for publishing variants"""
#from frontend_celery.webapp.utils.variant_importer import import_variants
Expand Down Expand Up @@ -131,7 +131,7 @@ def start_upload_one_variant_clinvar(variant_id, publish_queue_id, options, user
# this uses exponential backoff in case there is a http error
# this will retry 3 times before giving up
# first retry after 5 seconds, second after 25 seconds, third after 125 seconds (if task queue is empty that is)
@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600)
@celery.task(bind=True, retry_backoff=5, max_retries=3, soft_time_limit=600)
def clinvar_upload_one_variant(self, variant_id, user_roles, options, clinvar_accession, publish_clinvar_queue_id):
"""Background task for uploading one variant and its consensus classification to HerediCare. It also updates the consensus classification in case the variant is already known to HerediCare"""
#from frontend_celery.webapp.utils.variant_importer import fetch_heredicare
Expand Down Expand Up @@ -278,7 +278,7 @@ def start_upload_one_variant_heredicare(variant_id, publish_queue_id, options, u
# this uses exponential backoff in case there is a http error
# this will retry 3 times before giving up
# first retry after 5 seconds, second after 25 seconds, third after 125 seconds (if task queue is empty that is)
@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600)
@celery.task(bind=True, retry_backoff=5, max_retries=3, soft_time_limit=600)
def heredicare_upload_one_variant(self, variant_id, vid, user_roles, options, publish_heredicare_queue_id):
"""Background task for uploading one variant and its consensus classification to HerediCare. It also updates the consensus classification in case the variant is already known to HerediCare"""
self.update_state(state='PROGRESS')
Expand Down

0 comments on commit 6fbfe74

Please sign in to comment.