diff --git a/src/annotation_service/main.py b/src/annotation_service/main.py index 3d4487b8..58741df1 100644 --- a/src/annotation_service/main.py +++ b/src/annotation_service/main.py @@ -113,6 +113,12 @@ def process_one_request(annotation_queue_id, job_config = get_default_job_config conn.set_annotation_queue_status(annotation_queue_id, status="progress") print("processing request " + str(annotation_queue_id) + " annotating variant: " + "-".join([variant.chrom, str(variant.pos), variant.ref, variant.alt]) + " with id: " + str(variant.id) ) + # invalidate variant list download vcf files + list_ids = conn.get_list_ids_with_variant(variant_id) + for list_id in list_ids: + download_queue_ids = conn.get_valid_download_queue_ids(list_id, "list_download") + for download_queue_id in download_queue_ids: + conn.invalidate_download_queue(download_queue_id) # write a vcf with the current variant to disc vcf_path = get_temp_vcf_path(annotation_queue_id) diff --git a/src/common/db_IO.py b/src/common/db_IO.py index 93a1094c..40994431 100644 --- a/src/common/db_IO.py +++ b/src/common/db_IO.py @@ -754,7 +754,8 @@ def is_hgnc(self, string): def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, user_id, ranges = None, genes = None, consensus = None, user = None, automatic_splicing = None, automatic_protein = None, hgvs = None, variant_ids_oi = None, external_ids = None, cdna_ranges = None, annotation_restrictions = None, - include_heredicare_consensus = False, variant_strings = None, variant_types = None): + include_heredicare_consensus = False, variant_strings = None, variant_types = None, clinvar_upload_states = None, + heredicare_upload_states = None): # get one page of variants determined by offset & pagesize prefix = "SELECT id, chr, pos, ref, alt FROM variant" @@ -964,6 +965,30 @@ def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, use actual_information += tuple(automatic_without_dash) new_constraints = "id IN (" + new_constraints_inner + ")" postfix = self.add_constraints_to_command(postfix, new_constraints) + if clinvar_upload_states is not None and len(clinvar_upload_states) > 0: + new_constraints_inner = """ + SELECT variant_id FROM publish_clinvar_queue WHERE id IN ( + SELECT MAX(id) FROM publish_clinvar_queue WHERE status != 'skipped' GROUP BY variant_id + ) AND status IN + """ + placeholders = self.get_placeholders(len(clinvar_upload_states)) + new_constraints_inner += placeholders + new_constraints_inner = functions.enbrace(new_constraints_inner) + new_constraints = "id IN " + new_constraints_inner + postfix = self.add_constraints_to_command(postfix, new_constraints) + actual_information += tuple(clinvar_upload_states) + if heredicare_upload_states is not None and len(heredicare_upload_states) > 0: + new_constraints_inner = """ + SELECT DISTINCT variant_id FROM publish_heredicare_queue WHERE id IN ( + SELECT MAX(id) FROM publish_heredicare_queue WHERE status != 'skipped' GROUP BY vid + ) AND status IN + """ + placeholders = self.get_placeholders(len(heredicare_upload_states)) + new_constraints_inner += placeholders + new_constraints_inner = functions.enbrace(new_constraints_inner) + new_constraints = "id IN " + new_constraints_inner + postfix = self.add_constraints_to_command(postfix, new_constraints) + actual_information += tuple(heredicare_upload_states) if hgvs is not None and len(hgvs) > 0: all_variants = [] for hgvs_string in hgvs: @@ -1060,7 +1085,7 @@ def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, use offset = (page - 1) * page_size command = command + " LIMIT %s, %s" actual_information += (offset, page_size) - #print(command % actual_information) + print(command % actual_information) self.cursor.execute(command, actual_information) variants_raw = self.cursor.fetchall() @@ -2214,6 +2239,12 @@ def get_variant_ids_from_list(self, list_id): result = [str(x[0]) for x in result] # extract variant_id return result + def get_list_ids_with_variant(self, variant_id): + command = "SELECT DISTINCT list_id FROM list_variants WHERE variant_id = %s" + self.cursor.execute(command, (variant_id, )) + result = self.cursor.fetchall() + return [x[0] for x in result] + # list_id to get the right list # list_name is the value which will be updated def update_user_variant_list(self, list_id, new_list_name, public_read, public_edit): @@ -3323,7 +3354,8 @@ def clear_heredicare_annotation(self, variant_id): self.conn.commit() def get_enumtypes(self, tablename, columnname): - allowed_tablenames = ["consensus_classification", "user_classification", "variant", "annotation_queue", "automatic_classification", "sv_variant", "user_classification_criteria_applied", "consensus_classification_criteria_applied", "import_variant_queue"] + allowed_tablenames = ["consensus_classification", "user_classification", "variant", "annotation_queue", "automatic_classification", "sv_variant", + "user_classification_criteria_applied", "consensus_classification_criteria_applied", "import_variant_queue", "publish_heredicare_queue"] if tablename in allowed_tablenames: # prevent sql injection command = "SHOW COLUMNS FROM " + tablename + " WHERE FIELD = %s" else: @@ -3887,6 +3919,12 @@ def get_clinvar_queue_entries(self, publish_queue_ids: list, variant_id): self.cursor.execute(command, actual_information) result = self.cursor.fetchall() return result + + def get_unique_publish_clinvar_queue_status(self): + command = "SELECT DISTINCT status FROM publish_clinvar_queue WHERE status != 'skipped'" + self.cursor.execute(command) + result = self.cursor.fetchall() + return [x[0] for x in result] # DEPRECATED: delete later #def check_publish_queue_id(self, publish_queue_id): @@ -4065,4 +4103,58 @@ def insert_classification_final_class(self, classification_scheme_id, final_clas SELECT %s, %s FROM DUAL WHERE NOT EXISTS (SELECT * FROM classification_final_class WHERE `classification_scheme_id`=%s AND `classification`=%s LIMIT 1)""" self.cursor.execute(command, (classification_scheme_id, final_class, classification_scheme_id, final_class)) - self.conn.commit() \ No newline at end of file + self.conn.commit() + + + + + def insert_download_request(self, list_id, request_type, filename): + command = "INSERT INTO download_queue (identifier, type, filename) VALUES (%s, %s, %s)" + self.cursor.execute(command, (list_id, request_type, filename)) + self.conn.commit() + return self.get_most_recent_download_queue_id(list_id, request_type) + + def get_most_recent_download_queue_id(self, list_id, request_type): + command = "SELECT MAX(id) FROM download_queue WHERE identifier = %s AND type = %s AND is_valid = 1" + self.cursor.execute(command, (list_id, request_type)) + result = self.cursor.fetchone() + return result[0] + + def update_download_queue_celery_task_id(self, download_queue_id, celery_task_id): + command = "UPDATE download_queue SET celery_task_id = %s WHERE id = %s" + self.cursor.execute(command, (celery_task_id, download_queue_id)) + self.conn.commit() + + def close_download_queue(self, status, download_queue_id, message): + command = "UPDATE download_queue SET status = %s, finished_at = NOW(), message = %s WHERE id = %s" + self.cursor.execute(command, (status, message, download_queue_id)) + self.conn.commit() + + def update_download_queue_status(self, download_queue_id, status, message): + command = "UPDATE download_queue SET status = %s, message = %s WHERE id = %s" + self.cursor.execute(command, (status, message, download_queue_id)) + self.conn.commit() + + def get_download_queue(self, download_queue_id, minimal_info = False): + if download_queue_id is None: + return None + info = "requested_at, status, finished_at, message, is_valid, filename, identifier, type, celery_task_id" + if minimal_info: + info = "requested_at, status, finished_at, message, is_valid" + command = "SELECT " + info + " FROM download_queue WHERE id = %s" + self.cursor.execute(command, (download_queue_id, )) + result = self.cursor.fetchone() + return result + + def get_valid_download_queue_ids(self, identifier, request_type): + command = "SELECT id FROM download_queue WHERE identifier = %s AND type = %s AND is_valid = 1" + self.cursor.execute(command, (identifier, request_type)) + data = self.cursor.fetchall() + return [x[0] for x in data] + + def invalidate_download_queue(self, download_queue_id): + # invalidate in db + command = "UPDATE download_queue SET is_valid = 0 WHERE id = %s" + self.cursor.execute(command, (download_queue_id, )) + self.conn.commit() + \ No newline at end of file diff --git a/src/common/functions.py b/src/common/functions.py index 86f3ae01..61ca9c5a 100644 --- a/src/common/functions.py +++ b/src/common/functions.py @@ -14,6 +14,13 @@ import uuid from functools import cmp_to_key import pathlib +import werkzeug + + + + +def is_secure_filename(filename): + return werkzeug.utils.secure_filename(filename) == filename def prettyprint_json(json_obj, func = print): pretty_json = json.dumps(json_obj, indent=2) @@ -168,11 +175,12 @@ def convert_none_infinite(x): else: return x -def execute_command(command, process_name, use_prefix_error_log = True): - completed_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - std_out, std_err = completed_process.communicate()#[1].strip().decode("utf-8") # catch errors and warnings and convert to str +def execute_command(command, process_name, use_prefix_error_log = True, stdout=subprocess.PIPE): + completed_process = subprocess.Popen(command, stdout=stdout, stderr=subprocess.PIPE) + command_output, std_err = completed_process.communicate()#[1].strip().decode("utf-8") # catch errors and warnings and convert to str std_err = std_err.strip().decode("utf-8") - command_output = std_out.strip().decode("utf-8") + if command_output is not None: + command_output = command_output.strip().decode("utf-8") err_msg = "" if completed_process.returncode != 0: if use_prefix_error_log: @@ -672,9 +680,9 @@ def enpercent(string): return '%' + string + '%' -def get_random_temp_file(fileending, filename_ext = ""): +def get_random_temp_file(fileending, filename_ext = "", folder = tempfile.gettempdir()): filename = collect_info(str(uuid.uuid4()), "", filename_ext, sep = '_') - return os.path.join(tempfile.gettempdir(), filename + "." + str(fileending.strip('.'))).strip('.') + return os.path.join(folder, filename + "." + str(fileending.strip('.'))).strip('.') def rm(path): if os.path.exists(path): @@ -970,4 +978,8 @@ def one_to_three_letter(s): if s == "Y": return "Tyr" if s == "V": return "Val" if s == "X": return "Ter" - return "-" \ No newline at end of file + return "-" + + + + diff --git a/src/common/paths.py b/src/common/paths.py index 5203dee0..451dfce8 100644 --- a/src/common/paths.py +++ b/src/common/paths.py @@ -29,6 +29,7 @@ def joinpaths(path, *paths): resources_dir = joinpaths(workdir, 'resources') logs_dir = joinpaths(workdir, 'logs') downloads_dir = joinpaths(workdir, 'downloads') + download_variant_list_dir = joinpaths(downloads_dir, 'variant_lists') # classified variants folders are calculated in tasks.py #report_dir = joinpaths(workdir, "") #'downloads/consensus_classification_reports/' diff --git a/src/frontend_celery/webapp/download/download_functions.py b/src/frontend_celery/webapp/download/download_functions.py index 521fdb36..a566ec44 100644 --- a/src/frontend_celery/webapp/download/download_functions.py +++ b/src/frontend_celery/webapp/download/download_functions.py @@ -53,12 +53,11 @@ def generate_consensus_only_vcf(variant_types, dummy = False): variant_ids_oi = [] if not dummy: variant_ids_oi = conn.get_variant_ids_with_consensus_classification(variant_types = variant_types) - vcf_file_buffer, status, vcf_errors, err_msg = get_vcf(variant_ids_oi, conn, get_variant_vcf_line_only_consensus, check_vcf=False) - if status == "error": - raise IOError("There was an error during vcf generation: " + str(vcf_errors) + "; " + err_msg) - functions.buffer_to_file_system(vcf_file_buffer, path_to_download) + status, message = write_vcf_file(variant_ids_oi, path_to_download, conn, get_variant_vcf_line_only_consensus, check_vcf = False) conn.close() - + if status == "error": + raise IOError("There was an error during vcf generation: " + str(message)) + with open(last_dump_path, 'w') as last_dump_file: last_dump_file.write(last_dump_date) @@ -121,26 +120,70 @@ def get_vcf(variants_oi, conn, worker=get_variant_vcf_line, check_vcf=True): return buffer, status, "", "" -#import time -#def test_large_download(): -# for i in range(50): -# yield str(i).encode() -# print(i) -# time.sleep(1) -# -# -def get_vcf_stream(variants_oi, conn, worker=get_variant_vcf_line): - for id in variants_oi: - info_headers, variant_vcf = worker(id, conn) - yield variant_vcf - #all_variant_vcf_lines.append(variant_vcf) - #final_info_headers = merge_info_headers(final_info_headers, info_headers) - - #printable_info_headers = list(final_info_headers.values()) - #printable_info_headers.sort() - #functions.write_vcf_header(printable_info_headers, lambda l: yield_something(l.encode()), tail='\n') -def yield_something(val): - yield val + +def write_vcf_file(variant_ids, path, conn: Connection, worker=get_variant_vcf_line, check_vcf = True): + status = "success" + message = "" + + # write file in reverse to disc + generator = get_vcf_stream(variant_ids, conn, worker) + with open(path, "w") as file: + for line in generator: + file.write(line) + + # reverse vcf + path2 = path + ".rev" + command = ["tac", path] + with open(path2, "w") as f: + returncode, serr, sout = functions.execute_command(command, "tac", stdout = f) + if returncode != 0: + status = "error" + message = functions.collect_info(message, "", serr) + message = functions.collect_info(message, "", sout) + functions.rm(path2) + return status, message + functions.rm(path) + returncode, err_msg, command_output = functions.execute_command(["mv", path2, path], "mv") + if returncode != 0: return "error", err_msg + + # check vcf + if check_vcf: + returncode, err_msg, vcf_errors = functions.check_vcf(path) + if returncode != 0: + status = "error" + message = functions.collect_info(message, "", err_msg) + message = functions.collect_info(message, "", vcf_errors) + return status, message + + return status, message + + +# this is a generator to get a vcf file in reverse order +def get_vcf_stream(variant_ids, conn, worker=get_variant_vcf_line): + final_info_headers = {} + for variant_id in variant_ids: + info_headers, variant_vcf = worker(variant_id, conn) + yield variant_vcf + "\n" + final_info_headers = merge_info_headers(final_info_headers, info_headers) + + printable_info_headers = list(final_info_headers.values()) + printable_info_headers.sort() + buffer = io.StringIO() + functions.write_vcf_header(printable_info_headers, lambda l: buffer.write(l), tail='\n') + buffer.seek(0) + for line in reversed(list(buffer)): + yield line + + +def stream_file(file_path, remove_after = False): + file = open(file_path, 'r') + yield from file + file.close() + if remove_after: + os.remove(file_path) + + + diff --git a/src/frontend_celery/webapp/download/download_routes.py b/src/frontend_celery/webapp/download/download_routes.py index 3b6d33f8..20de00af 100644 --- a/src/frontend_celery/webapp/download/download_routes.py +++ b/src/frontend_celery/webapp/download/download_routes.py @@ -8,7 +8,8 @@ from common.db_IO import Connection import common.paths as paths from ..utils import * -from . import download_functions +from . import download_functions, download_tasks +import werkzeug download_blueprint = Blueprint( @@ -33,19 +34,54 @@ def variant(): redirect_url = url_for('variant.display', variant_id = variant_id) download_file_name = "variant_" + str(variant_id) + ".vcf" - vcf_file_buffer, status, vcf_errors, err_msg = download_functions.get_vcf([variant_id], conn, check_vcf=not request.args.get('force', False)) + #vcf_file_buffer, status, vcf_errors, err_msg = download_functions.get_vcf([variant_id], conn, check_vcf=not request.args.get('force', False)) + filepath = functions.get_random_temp_file(fileending = ".vcf", filename_ext = "variant_download_" + str(variant_id)) + status, message = download_functions.write_vcf_file([variant_id], filepath, conn, check_vcf=not request.args.get('force', False)) - if status in ['redirect', 'error']: - flash({"message": "Error during VCF Check: " + vcf_errors + " with error message: " + err_msg + ". Download it anyway", + if status in ['error']: + flash({"message": "Error during VCF generation: " + message + ". Download it anyway", "link": force_url}, "alert-danger") - current_app.logger.error(get_preferred_username() + " tried to download a vcf which contains errors: " + vcf_errors + ". For variant ids: " + str(variant_id)) + current_app.logger.error(get_preferred_username() + " tried to download a vcf which contains errors: " + message + ". For variant ids: " + str(variant_id)) return redirect(redirect_url) current_app.logger.info(get_preferred_username() + " downloaded vcf of variant id: " + str(variant_id)) - return send_file(vcf_file_buffer, as_attachment=True, download_name=download_file_name, mimetype="text/vcf") + return current_app.response_class( + download_functions.stream_file(filepath, remove_after = True), + content_type = "text/event-stream", + headers={'Content-Disposition': f'attachment; filename={download_file_name}', 'X-Accel-Buffering': 'no'} + ) +## listens on get parameter: list_id +#@download_blueprint.route('/download/vcf/variant_list') +#@require_permission(['read_resources']) +#def variant_list(): +# conn = get_connection() +# +# list_id = request.args.get('list_id') +# require_valid(list_id, "user_variant_lists", conn) +# +# # check that the logged in user is the owner of this list +# require_list_permission(list_id, ['read'], conn) +# variant_ids_oi = conn.get_variant_ids_from_list(list_id) +# +# force_url = url_for("download.variant_list", list_id = list_id, force = True) +# redirect_url = url_for("user.my_lists", view = list_id) +# download_file_name = "list_" + str(list_id) + ".vcf" +# +# vcf_file_buffer, status, vcf_errors, err_msg = download_functions.get_vcf(variant_ids_oi, conn) +# +# if status == "redirect": +# flash({"message": "Error during VCF Check: " + vcf_errors + " with error message: " + err_msg + ". Download it anyway", +# "link": force_url}, "alert-danger") +# current_app.logger.error(get_preferred_username() + " tried to download a vcf which contains errors: " + vcf_errors + ". For variant list " + str(list_id)) +# return redirect(redirect_url) +# +# current_app.logger.info(get_preferred_username() + " downloaded vcf of variant list: " + str(list_id)) +# +# return send_file(vcf_file_buffer, as_attachment=True, download_name=download_file_name, mimetype="text/vcf") + # listens on get parameter: list_id @download_blueprint.route('/download/vcf/variant_list') @require_permission(['read_resources']) @@ -57,37 +93,60 @@ def variant_list(): # check that the logged in user is the owner of this list require_list_permission(list_id, ['read'], conn) - variant_ids_oi = conn.get_variant_ids_from_list(list_id) - force_url = url_for("download.variant_list", list_id = list_id, force = True) - redirect_url = url_for("user.my_lists", view = list_id) - download_file_name = "list_" + str(list_id) + ".vcf" + download_queue_id = conn.get_most_recent_download_queue_id(list_id, "list_download") + if download_queue_id is None: + flash("No download available! Generate the VCF first.", "alert-danger") + return redirect(url_for("user.my_lists", view=list_id)) + + download_queue = conn.get_download_queue(download_queue_id) + file_name = download_queue[5] + file_path = paths.download_variant_list_dir + "/" + file_name + is_valid = download_queue[1] + if not is_valid: + flash("No download available! Generate the VCF first.", "alert-danger") + return redirect(url_for("user.my_lists", view=list_id)) + if not functions.is_secure_filename(file_name): + flash("Invalid filename", "alert-danger") + return redirect(url_for("user.my_lists", view=list_id)) + if not os.path.exists(file_path): + flash("No download available! Generate the VCF first.", "alert-danger") + return redirect(url_for("user.my_lists", view=list_id)) - vcf_file_buffer, status, vcf_errors, err_msg = download_functions.get_vcf(variant_ids_oi, conn) + current_app.logger.info(get_preferred_username() + " downloaded vcf of variant list: " + str(list_id)) - if status == "redirect": - flash({"message": "Error during VCF Check: " + vcf_errors + " with error message: " + err_msg + ". Download it anyway", - "link": force_url}, "alert-danger") - current_app.logger.error(get_preferred_username() + " tried to download a vcf which contains errors: " + vcf_errors + ". For variant list " + str(list_id)) - return redirect(redirect_url) + download_filename = "list_" + str(list_id) + ".vcf" + return current_app.response_class( + download_functions.stream_file(file_path), + content_type = "text/event-stream", + headers={'Content-Disposition': f'attachment; filename={download_filename}', 'X-Accel-Buffering': 'no'} + ) - current_app.logger.info(get_preferred_username() + " downloaded vcf of variant list: " + str(list_id)) + +@download_blueprint.route('/generate/vcf/variant_list') +@require_permission(['read_resources']) +def generate_variant_list_vcf(): + conn = get_connection() + + list_id = request.args.get('list_id') + require_valid(list_id, "user_variant_lists", conn) + + # check that the logged in user is the owner of this list + require_list_permission(list_id, ['read'], conn) + + download_queue_id = conn.get_most_recent_download_queue_id(list_id, "list_download") + download_queue = conn.get_download_queue(download_queue_id) + if download_queue is not None: + if download_queue[1] in ["pending", "progress", "retry"]: + return "skipped" - return send_file(vcf_file_buffer, as_attachment=True, download_name=download_file_name, mimetype="text/vcf") + download_queue_id = download_tasks.start_generate_list_vcf(list_id, session["user"]["roles"], conn) + return "success" -from flask import Response, stream_with_context -#@download_blueprint.route('/download/test') -#@require_permission(["admin_resources"]) -#def download_test(): -# return Response( -# stream_with_context(download_functions.test_large_download()), -# headers={'Content-Disposition': 'attachment; filename=test.txt'} -# ) -# -@download_blueprint.route('/download/test_vcf') -@require_permission(["admin_resources"]) -def download_test_vcf(): +@download_blueprint.route('/generate/vcf/variant_list/status') +@require_permission(['read_resources']) +def generate_variant_list_vcf_status(): conn = get_connection() list_id = request.args.get('list_id') @@ -95,13 +154,61 @@ def download_test_vcf(): # check that the logged in user is the owner of this list require_list_permission(list_id, ['read'], conn) - variant_ids_oi = conn.get_variant_ids_from_list(list_id) - return Response( - stream_with_context(download_functions.get_vcf_stream(variant_ids_oi, conn)), - content_type = "text/event-stream", - headers={'Content-Disposition': 'attachment; filename=test.txt', 'X-Accel-Buffering': 'no'} - ) + download_queue_id = conn.get_most_recent_download_queue_id(list_id, "list_download") + download_queue_raw = conn.get_download_queue(download_queue_id, minimal_info = True) #requested_at, status, finished_at, message, is_valid + + if download_queue_raw is None: + return jsonify({ + "requested_at": "", + "status": "no_vcf", + "finished_at": "", + "message": "", + "is_valid": 0 + }) + + return jsonify({ + "requested_at": download_queue_raw[0], + "status": download_queue_raw[1], + "finished_at": download_queue_raw[2], + "message": download_queue_raw[3], + "is_valid": download_queue_raw[4] + }) + + + +#@download_blueprint.route('/download/test') +#@require_permission(["admin_resources"]) +#def download_test(): +# import time +# def test_large_download(): +# for i in range(50): +# yield str(i).encode() +# print(i) +# time.sleep(1) +# return Response( +# stream_with_context(download_functions.test_large_download()), +# content_type = "text/event-stream", +# headers={'Content-Disposition': 'attachment; filename=test.txt', 'X-Accel-Buffering': 'no'} +# ) + +#@download_blueprint.route('/download/test_vcf') +#@require_permission(["admin_resources"]) +#def download_test_vcf(): +# conn = get_connection() +# +# list_id = request.args.get('list_id') +# require_valid(list_id, "user_variant_lists", conn) +# +# # check that the logged in user is the owner of this list +# require_list_permission(list_id, ['read'], conn) +# variant_ids_oi = conn.get_variant_ids_from_list(list_id) +# +# return Response( +# stream_with_context(download_functions.get_vcf_stream(variant_ids_oi, conn)), +# content_type = "text/event-stream", +# headers={'Content-Disposition': 'attachment; filename=test.txt', 'X-Accel-Buffering': 'no'} +# ) # listens on get parameter: raw diff --git a/src/frontend_celery/webapp/download/download_tasks.py b/src/frontend_celery/webapp/download/download_tasks.py new file mode 100644 index 00000000..1676e41d --- /dev/null +++ b/src/frontend_celery/webapp/download/download_tasks.py @@ -0,0 +1,100 @@ +from urllib.error import HTTPError +import sys +from os import path +sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) +from webapp import celery, utils +from common import functions +from common import paths +from common.db_IO import Connection +from celery.exceptions import Ignore +import time + +from . import download_functions +# errors +from urllib.error import HTTPError +from mysql.connector import Error, InternalError +import traceback + + + +########################################################### +############## START VARIANT TSV BULK IMPORT ############## +########################################################### + +def start_generate_list_vcf(list_id, user_roles, conn: Connection): + functions.mkdir_recursive(paths.download_variant_list_dir) + filename = functions.get_random_temp_file(fileending = ".vcf", filename_ext = "_".join(["list", str(list_id), functions.get_today()]), folder = "") + + request_type = "list_download" + + utils.invalidate_download_queue(list_id, request_type, conn) + download_queue_id = conn.insert_download_request(list_id, request_type, filename) + + task = generate_list_vcf.apply_async(args=[list_id, user_roles, download_queue_id, filename]) # start task + task_id = task.id + + conn.update_download_queue_celery_task_id(download_queue_id, celery_task_id = task_id) # save the task id for status updates + + return download_queue_id + + +@celery.task(bind=True, retry_backoff=5, max_retries=3) +def generate_list_vcf(self, list_id, user_roles, download_queue_id, filename): + """ Background task for importing generating vcf file of variant list """ + self.update_state("PROGRESS") + + try: + conn = Connection(user_roles) + message = "" + status = "success" + + conn.update_download_queue_status(download_queue_id, status = "progress", message = "") + + if not functions.is_secure_filename(filename): # just for safety - might not be required + status = "error" + message = "Invalid filename provided!" + else: + variant_ids_oi = conn.get_variant_ids_from_list(list_id) + filepath = paths.download_variant_list_dir + "/" + filename + status, message = download_functions.write_vcf_file(variant_ids_oi, filepath, conn) + + except InternalError as e: + # deadlock: code 1213 + status = "retry" + message = "Attempting retry because of database error: " + str(e) + ' ' + traceback.format_exc() + except Error as e: + status = "error" + message = "There was a database error: " + str(e) + ' ' + traceback.format_exc() + except HTTPError as e: + status = "retry" + message = "Attempting retry because of http error: " + str(e) + ' ' + traceback.format_exc() + except Exception as e: + status = "error" + message = "There was a runtime error: " + str(e) + ' ' + traceback.format_exc() + + print(status) + #print(message) + + if status != "retry": + conn.close_download_queue(status, download_queue_id, message = message[0:1000]) + else: + conn.update_download_queue_status(download_queue_id, status = status, message = message[0:1000]) + + conn.close() + + if status == 'error': + self.update_state(state="FAILURE", meta={ + 'exc_type': "Runtime error", + 'exc_message': message, + 'custom': '...' + }) + raise Ignore() + elif status == "retry": + self.update_state(state="RETRY", meta={ + 'exc_type': "Runtime error", + 'exc_message': message, + 'custom': '...'}) + generate_list_vcf.retry() + else: + self.update_state(state="SUCCESS") + diff --git a/src/frontend_celery/webapp/static/js/my_lists.js b/src/frontend_celery/webapp/static/js/my_lists.js index ddf78b76..e58c2b97 100644 --- a/src/frontend_celery/webapp/static/js/my_lists.js +++ b/src/frontend_celery/webapp/static/js/my_lists.js @@ -9,6 +9,8 @@ $(document).ready(function() const page_size = flask_data.dataset.pageSize const base_delete_action_url = flask_data.dataset.deleteAction const view_list = flask_data.dataset.viewList + const generate_list_vcf_url = flask_data.dataset.generateListVcfUrl + const generate_list_vcf_status_url = flask_data.dataset.generateListVcfStatusUrl // edit / create button functionality $('#list-modal-submit').click(function(){ @@ -17,25 +19,24 @@ $(document).ready(function() $('#list-modal-form').submit(); }); - $('#export_to_vcf_button').click(function(){ + $('#generate_list_vcf_button').click(function(){ //document.getElementById("export_to_vcf_worker").click() - console.log(view_list) - console.log("started") + console.log(generate_list_vcf_url) $.ajax({ type: 'GET', - url: "/download/vcf/variant_list", - data: {'list_id': view_list}, + url: generate_list_vcf_url, success: function(returnval, status, request) { - console.log(returnval) + show_vcf_gen_status('bg-secondary', "Annotation requested successfully", "Annotation requested") + update_vcf_generation_status(generate_list_vcf_status_url); }, error: function(xhr, status, error) { console.log(xhr) - console.log(status) - console.log(error) + show_vcf_gen_status("bg-danger", "The variant could not be annotated. Either the service is unreachable or some other unexpected exception occured. You can try to reload this page or issue another annotation to fix this. If that does not help try again later. Status code is: " + xhr.status, "Internal error") + $('#reannotate_button').attr('disabled', false) } }); - console.log("after download") }); + update_vcf_generation_status(generate_list_vcf_status_url) var list_id = $('#current-list-id')[0].innerText @@ -103,6 +104,92 @@ $(document).ready(function() + +//