From 0ab95208c31e74a23a083e66dc1570476484f5e7 Mon Sep 17 00:00:00 2001 From: MarvinDo Date: Thu, 29 Aug 2024 10:55:00 +0200 Subject: [PATCH] improved heredicare upload status handling --- src/common/heredicare_interface.py | 79 ++++++++++++------- .../webapp/upload/upload_tasks.py | 5 +- .../webapp/variant/variant_functions.py | 2 +- .../webapp/variant/variant_routes.py | 8 +- 4 files changed, 61 insertions(+), 33 deletions(-) diff --git a/src/common/heredicare_interface.py b/src/common/heredicare_interface.py index a59639ff..8c046162 100644 --- a/src/common/heredicare_interface.py +++ b/src/common/heredicare_interface.py @@ -10,6 +10,7 @@ import re import json import urllib +import time #class Heredicare_Flask(): # def __init__(self, app=None): @@ -195,21 +196,27 @@ def iterate_pagination(self, start_url, project_type, items_key = "items"): result = [] url = start_url has_next = True - while has_next: + + retry = True + max_tries = 3 + backoff_mult = 20 + current_try = 0 + while has_next and retry and( current_try <= max_tries): status, message = self.introspect_token(project_type) # checks validity of the token and updates it if neccessary if status == 'error': break bearer, timestamp = self.get_saved_bearer(project_type) header = {"Authorization": "Bearer " + bearer} + time.sleep(current_try * backoff_mult) resp = requests.get(url, headers=header) - if resp.status_code == 401: # unauthorized - message = "ERROR: HerediCare API get post info endpoint returned an HTTP 401, unauthorized error. Attempting retry." - status = "retry" - break + if resp.status_code in [401, 503]: # unauthorized, service unavailable + message = "ERROR: HerediCare API endpoint returned an HTTP " + str(resp.status_code) + " in URL: " + str(url) + status = "error" + current_try += 1 elif resp.status_code != 200: # any other kind of error - message = "ERROR: HerediCare API get post info endpoint returned an HTTP " + str(resp.status_code) + " error: " + self.extract_error_message(resp.text) + " in URL: " + str(url) + message = "ERROR: HerediCare API endpoint returned an HTTP " + str(resp.status_code) + " error: " + self.extract_error_message(resp.text) + " in URL: " + str(url) status = "error" - break + retry = False else: # request was successful resp = resp.json(strict=False) new_items = resp[items_key] @@ -225,7 +232,9 @@ def iterate_pagination(self, start_url, project_type, items_key = "items"): message = "ERROR: response 'hasMore' attribute is true but no 'next' url found." status = "error" has_next = False # prevent infinite loop in case the next url is missing for some reason - break + retry = False + else: + retry = False # for safety, has_next is false anyway return status, message, result def get_post_regexes(self): @@ -311,7 +320,7 @@ def get_submission_status(self, submission_id): return finished_at, status, message status, message = self.introspect_token(project_type) # checks validity of the token and updates it if neccessary - if status == 'api_error': + if status == 'error': return finished_at, status, message url = self.get_url(project_type, "submission_status", [str(submission_id)]) @@ -321,7 +330,7 @@ def get_submission_status(self, submission_id): resp = requests.get(url, headers=header) if resp.status_code != 200: message = "ERROR: HerediCare API get submission id endpoint endpoint returned an HTTP " + str(resp.status_code) + " error: " + self.extract_error_message(resp.text) - status = "api_error" + status = "retry" else: # success resp = resp.json(strict=False) items = resp["items"] @@ -661,26 +670,40 @@ def _post_data(self, data): message = "" project_type = "upload" - status, message = self.introspect_token(project_type) # checks validity of the token and updates it if neccessary - if status == 'error': - return status, message + retry = True + max_tries = 3 + backoff_mult = 20 + current_try = 0 - url = self.get_url(project_type, "send_data") - bearer, timestamp = self.get_saved_bearer(project_type) - header = {"Authorization": "Bearer " + bearer} - resp = requests.post(url, headers=header, data=data) + while retry and (current_try <= max_tries): + status, message = self.introspect_token(project_type) # checks validity of the token and updates it if neccessary + if status == 'error': + break + + url = self.get_url(project_type, "send_data") + bearer, timestamp = self.get_saved_bearer(project_type) + header = {"Authorization": "Bearer " + bearer} - if resp.status_code == 401: # unauthorized - message = "ERROR: HerediCare API post variant data endpoint returned an HTTP 401, unauthorized error. Attempting retry." - status = "retry" - elif resp.status_code == 555: - message = "ERROR: HerediCare API post variant data endpoint returned an HTTP 555 error. Reason: " + urllib.parse.unquote(resp.headers.get("Error-Reason", "not provided")) - status = "error" - elif resp.status_code != 200: - message = "ERROR: HerediCare API post variant data endpoint returned an HTTP " + str(resp.status_code) + " error: " + self.extract_error_message(resp.text) - status = "error" - else: # success - print(resp.text) + time.sleep(current_try * backoff_mult) + resp = requests.post(url, headers=header, data=data) + + if resp.status_code in [401, 503]: # unauthorized, service unavailable --> retry these + message = "ERROR: HerediCare API post variant data endpoint returned an HTTP " + str(resp.status_code) + status = "error" + current_try += 1 + elif resp.status_code == 555: + message = "ERROR: HerediCare API post variant data endpoint returned an HTTP 555 error. Reason: " + urllib.parse.unquote(resp.headers.get("Error-Reason", "not provided")) + status = "error" + retry = False + elif resp.status_code != 200: + message = "ERROR: HerediCare API post variant data endpoint returned an HTTP " + str(resp.status_code) + " error: " + self.extract_error_message(resp.text) + status = "error" + retry = False + else: # success + status = "success" + message = "" + retry = False + #print(resp.text) return status, message diff --git a/src/frontend_celery/webapp/upload/upload_tasks.py b/src/frontend_celery/webapp/upload/upload_tasks.py index 2e71fdc3..a5452eab 100644 --- a/src/frontend_celery/webapp/upload/upload_tasks.py +++ b/src/frontend_celery/webapp/upload/upload_tasks.py @@ -247,7 +247,8 @@ def start_upload_one_variant_heredicare(variant_id, publish_queue_id, options, u for vid in heredicare_exclusive_vids: heredicare_variant, status, message = heredicare_interface.get_variant(vid) if status != 'success': - conn.update_publish_heredicare_queue_status(publish_heredicare_queue_id, status = "error", message = "Could not fetch vid from heredicare: " + message) + publish_heredicare_queue_id = conn.insert_publish_heredicare_request(vid, variant_id, publish_queue_id) + conn.update_publish_heredicare_queue_status(publish_heredicare_queue_id, status = "skipped", message = "Could not fetch vid from heredicare: " + message) return None if functions.trim_chr(variant.chrom) == str(functions.trim_chr(heredicare_variant['CHROM'])) and str(variant.pos) == str(heredicare_variant['POS_HG38']) and str(variant.alt) == str(heredicare_variant['ALT_HG38']) and str(variant.ref) == str(heredicare_variant['REF_HG38']) and vid not in vids: vids.append(vid) @@ -267,7 +268,7 @@ def start_upload_one_variant_heredicare(variant_id, publish_queue_id, options, u #elif requires_reannotation: # if the variant was reannotated before the last heredicare upload skip the new upload: reason: heredivar might not know that the variant was already inserted and would want to insert it again. or have outdated old data # conn.update_publish_heredicare_queue_status(publish_heredicare_queue_id, status = "error", message = "The annotation is older than the last upload to heredicare. Please reannotate first and then upload to HerediCaRe.") elif vid_list_status != "success": - conn.update_publish_heredicare_queue_status(publish_heredicare_queue_id, status = "error", message = "Could not fetch vid list: " + vid_list_message) + conn.update_publish_heredicare_queue_status(publish_heredicare_queue_id, status = "skipped", message = "Could not fetch vid list: " + vid_list_message) else: task = heredicare_upload_one_variant.apply_async(args=[variant_id, vid, user_roles, options, publish_heredicare_queue_id]) task_id = task.id diff --git a/src/frontend_celery/webapp/variant/variant_functions.py b/src/frontend_celery/webapp/variant/variant_functions.py index aafac30b..afb9fd4f 100644 --- a/src/frontend_celery/webapp/variant/variant_functions.py +++ b/src/frontend_celery/webapp/variant/variant_functions.py @@ -313,7 +313,7 @@ def handle_selected_literature(previous_selected_literature, classification_id, #heredicare_queue_entries: ALL heredicare queue entries until error or success is hit #publish_queue_heredicare_queue_entries: ONLY entries of publish_queue -def summarize_heredicare_status(heredicare_queue_entries, publish_queue): +def summarize_heredicare_status(heredicare_queue_entries, publish_queue, mrcc): summary = {"status": "unknown", "max_requested_at": "unknown", "insert_tasks_message": ""} if publish_queue is not None: # fresh upload - preferred diff --git a/src/frontend_celery/webapp/variant/variant_routes.py b/src/frontend_celery/webapp/variant/variant_routes.py index 63e47586..1abc2c8a 100644 --- a/src/frontend_celery/webapp/variant/variant_routes.py +++ b/src/frontend_celery/webapp/variant/variant_routes.py @@ -141,19 +141,23 @@ def display(variant_id=None, chr=None, pos=None, ref=None, alt=None): most_recent_publish_queue = conn.get_most_recent_publish_queue(variant_id = variant_id, upload_clinvar = True) publish_queue_ids_oi = conn.get_most_recent_publish_queue_ids_clinvar(variant_id) clinvar_queue_entries = check_update_clinvar_status(variant_id, publish_queue_ids_oi, conn) - clinvar_queue_entry_summary = variant_functions.summarize_clinvar_status(clinvar_queue_entries, most_recent_publish_queue) + # get current status of heredicare submission most_recent_publish_queue = conn.get_most_recent_publish_queue(variant_id = variant_id, upload_heredicare = True) publish_queue_ids_oi = conn.get_most_recent_publish_queue_ids_heredicare(variant_id) heredicare_queue_entries = check_update_heredicare_status(variant_id, publish_queue_ids_oi, conn) #most_recent_heredicare_queue_entries = conn.get_heredicare_queue_entries([most_recent_publish_queue.id], variant_id) - heredicare_queue_entry_summary = variant_functions.summarize_heredicare_status(heredicare_queue_entries, most_recent_publish_queue) + # get the variant and all its annotations # get this after updating the upload stati to display the most recent status of each upload variant = conn.get_variant(variant_id) + # summarize the stati for display + clinvar_queue_entry_summary = variant_functions.summarize_clinvar_status(clinvar_queue_entries, most_recent_publish_queue) + heredicare_queue_entry_summary = variant_functions.summarize_heredicare_status(heredicare_queue_entries, most_recent_publish_queue, variant.get_recent_consensus_classification()) + return render_template('variant/variant.html', lists = lists, variant = variant,