From 72a7c76b5a65955c9a03017926b56ba4da484be1 Mon Sep 17 00:00:00 2001 From: MarvinDo Date: Fri, 23 Aug 2024 15:32:23 +0200 Subject: [PATCH] added improved vcf generation and list download --- src/annotation_service/main.py | 6 + src/common/db_IO.py | 100 +++++++++- src/common/functions.py | 26 ++- src/common/paths.py | 1 + .../webapp/download/download_functions.py | 93 ++++++--- .../webapp/download/download_routes.py | 179 ++++++++++++++---- .../webapp/download/download_tasks.py | 100 ++++++++++ .../webapp/static/js/my_lists.js | 105 +++++++++- src/frontend_celery/webapp/static/js/utils.js | 18 ++ .../webapp/static/js/variant_addition.js | 29 +-- src/frontend_celery/webapp/tasks.py | 5 +- .../webapp/templates/index.html | 5 +- .../webapp/templates/macros.html | 30 +++ .../webapp/templates/user/my_lists.html | 19 +- .../webapp/upload/upload_routes.py | 3 + .../webapp/user/user_routes.py | 6 + .../webapp/utils/create_db_version.py | 8 +- .../webapp/utils/frontend_utils.py | 30 ++- .../webapp/utils/search_utils.py | 31 ++- .../webapp/variant/variant_routes.py | 6 + 20 files changed, 689 insertions(+), 111 deletions(-) create mode 100644 src/frontend_celery/webapp/download/download_tasks.py diff --git a/src/annotation_service/main.py b/src/annotation_service/main.py index 3d4487b8..58741df1 100644 --- a/src/annotation_service/main.py +++ b/src/annotation_service/main.py @@ -113,6 +113,12 @@ def process_one_request(annotation_queue_id, job_config = get_default_job_config conn.set_annotation_queue_status(annotation_queue_id, status="progress") print("processing request " + str(annotation_queue_id) + " annotating variant: " + "-".join([variant.chrom, str(variant.pos), variant.ref, variant.alt]) + " with id: " + str(variant.id) ) + # invalidate variant list download vcf files + list_ids = conn.get_list_ids_with_variant(variant_id) + for list_id in list_ids: + download_queue_ids = conn.get_valid_download_queue_ids(list_id, "list_download") + for download_queue_id in download_queue_ids: + conn.invalidate_download_queue(download_queue_id) # write a vcf with the current variant to disc vcf_path = get_temp_vcf_path(annotation_queue_id) diff --git a/src/common/db_IO.py b/src/common/db_IO.py index 93a1094c..40994431 100644 --- a/src/common/db_IO.py +++ b/src/common/db_IO.py @@ -754,7 +754,8 @@ def is_hgnc(self, string): def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, user_id, ranges = None, genes = None, consensus = None, user = None, automatic_splicing = None, automatic_protein = None, hgvs = None, variant_ids_oi = None, external_ids = None, cdna_ranges = None, annotation_restrictions = None, - include_heredicare_consensus = False, variant_strings = None, variant_types = None): + include_heredicare_consensus = False, variant_strings = None, variant_types = None, clinvar_upload_states = None, + heredicare_upload_states = None): # get one page of variants determined by offset & pagesize prefix = "SELECT id, chr, pos, ref, alt FROM variant" @@ -964,6 +965,30 @@ def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, use actual_information += tuple(automatic_without_dash) new_constraints = "id IN (" + new_constraints_inner + ")" postfix = self.add_constraints_to_command(postfix, new_constraints) + if clinvar_upload_states is not None and len(clinvar_upload_states) > 0: + new_constraints_inner = """ + SELECT variant_id FROM publish_clinvar_queue WHERE id IN ( + SELECT MAX(id) FROM publish_clinvar_queue WHERE status != 'skipped' GROUP BY variant_id + ) AND status IN + """ + placeholders = self.get_placeholders(len(clinvar_upload_states)) + new_constraints_inner += placeholders + new_constraints_inner = functions.enbrace(new_constraints_inner) + new_constraints = "id IN " + new_constraints_inner + postfix = self.add_constraints_to_command(postfix, new_constraints) + actual_information += tuple(clinvar_upload_states) + if heredicare_upload_states is not None and len(heredicare_upload_states) > 0: + new_constraints_inner = """ + SELECT DISTINCT variant_id FROM publish_heredicare_queue WHERE id IN ( + SELECT MAX(id) FROM publish_heredicare_queue WHERE status != 'skipped' GROUP BY vid + ) AND status IN + """ + placeholders = self.get_placeholders(len(heredicare_upload_states)) + new_constraints_inner += placeholders + new_constraints_inner = functions.enbrace(new_constraints_inner) + new_constraints = "id IN " + new_constraints_inner + postfix = self.add_constraints_to_command(postfix, new_constraints) + actual_information += tuple(heredicare_upload_states) if hgvs is not None and len(hgvs) > 0: all_variants = [] for hgvs_string in hgvs: @@ -1060,7 +1085,7 @@ def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, use offset = (page - 1) * page_size command = command + " LIMIT %s, %s" actual_information += (offset, page_size) - #print(command % actual_information) + print(command % actual_information) self.cursor.execute(command, actual_information) variants_raw = self.cursor.fetchall() @@ -2214,6 +2239,12 @@ def get_variant_ids_from_list(self, list_id): result = [str(x[0]) for x in result] # extract variant_id return result + def get_list_ids_with_variant(self, variant_id): + command = "SELECT DISTINCT list_id FROM list_variants WHERE variant_id = %s" + self.cursor.execute(command, (variant_id, )) + result = self.cursor.fetchall() + return [x[0] for x in result] + # list_id to get the right list # list_name is the value which will be updated def update_user_variant_list(self, list_id, new_list_name, public_read, public_edit): @@ -3323,7 +3354,8 @@ def clear_heredicare_annotation(self, variant_id): self.conn.commit() def get_enumtypes(self, tablename, columnname): - allowed_tablenames = ["consensus_classification", "user_classification", "variant", "annotation_queue", "automatic_classification", "sv_variant", "user_classification_criteria_applied", "consensus_classification_criteria_applied", "import_variant_queue"] + allowed_tablenames = ["consensus_classification", "user_classification", "variant", "annotation_queue", "automatic_classification", "sv_variant", + "user_classification_criteria_applied", "consensus_classification_criteria_applied", "import_variant_queue", "publish_heredicare_queue"] if tablename in allowed_tablenames: # prevent sql injection command = "SHOW COLUMNS FROM " + tablename + " WHERE FIELD = %s" else: @@ -3887,6 +3919,12 @@ def get_clinvar_queue_entries(self, publish_queue_ids: list, variant_id): self.cursor.execute(command, actual_information) result = self.cursor.fetchall() return result + + def get_unique_publish_clinvar_queue_status(self): + command = "SELECT DISTINCT status FROM publish_clinvar_queue WHERE status != 'skipped'" + self.cursor.execute(command) + result = self.cursor.fetchall() + return [x[0] for x in result] # DEPRECATED: delete later #def check_publish_queue_id(self, publish_queue_id): @@ -4065,4 +4103,58 @@ def insert_classification_final_class(self, classification_scheme_id, final_clas SELECT %s, %s FROM DUAL WHERE NOT EXISTS (SELECT * FROM classification_final_class WHERE `classification_scheme_id`=%s AND `classification`=%s LIMIT 1)""" self.cursor.execute(command, (classification_scheme_id, final_class, classification_scheme_id, final_class)) - self.conn.commit() \ No newline at end of file + self.conn.commit() + + + + + def insert_download_request(self, list_id, request_type, filename): + command = "INSERT INTO download_queue (identifier, type, filename) VALUES (%s, %s, %s)" + self.cursor.execute(command, (list_id, request_type, filename)) + self.conn.commit() + return self.get_most_recent_download_queue_id(list_id, request_type) + + def get_most_recent_download_queue_id(self, list_id, request_type): + command = "SELECT MAX(id) FROM download_queue WHERE identifier = %s AND type = %s AND is_valid = 1" + self.cursor.execute(command, (list_id, request_type)) + result = self.cursor.fetchone() + return result[0] + + def update_download_queue_celery_task_id(self, download_queue_id, celery_task_id): + command = "UPDATE download_queue SET celery_task_id = %s WHERE id = %s" + self.cursor.execute(command, (celery_task_id, download_queue_id)) + self.conn.commit() + + def close_download_queue(self, status, download_queue_id, message): + command = "UPDATE download_queue SET status = %s, finished_at = NOW(), message = %s WHERE id = %s" + self.cursor.execute(command, (status, message, download_queue_id)) + self.conn.commit() + + def update_download_queue_status(self, download_queue_id, status, message): + command = "UPDATE download_queue SET status = %s, message = %s WHERE id = %s" + self.cursor.execute(command, (status, message, download_queue_id)) + self.conn.commit() + + def get_download_queue(self, download_queue_id, minimal_info = False): + if download_queue_id is None: + return None + info = "requested_at, status, finished_at, message, is_valid, filename, identifier, type, celery_task_id" + if minimal_info: + info = "requested_at, status, finished_at, message, is_valid" + command = "SELECT " + info + " FROM download_queue WHERE id = %s" + self.cursor.execute(command, (download_queue_id, )) + result = self.cursor.fetchone() + return result + + def get_valid_download_queue_ids(self, identifier, request_type): + command = "SELECT id FROM download_queue WHERE identifier = %s AND type = %s AND is_valid = 1" + self.cursor.execute(command, (identifier, request_type)) + data = self.cursor.fetchall() + return [x[0] for x in data] + + def invalidate_download_queue(self, download_queue_id): + # invalidate in db + command = "UPDATE download_queue SET is_valid = 0 WHERE id = %s" + self.cursor.execute(command, (download_queue_id, )) + self.conn.commit() + \ No newline at end of file diff --git a/src/common/functions.py b/src/common/functions.py index 86f3ae01..61ca9c5a 100644 --- a/src/common/functions.py +++ b/src/common/functions.py @@ -14,6 +14,13 @@ import uuid from functools import cmp_to_key import pathlib +import werkzeug + + + + +def is_secure_filename(filename): + return werkzeug.utils.secure_filename(filename) == filename def prettyprint_json(json_obj, func = print): pretty_json = json.dumps(json_obj, indent=2) @@ -168,11 +175,12 @@ def convert_none_infinite(x): else: return x -def execute_command(command, process_name, use_prefix_error_log = True): - completed_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - std_out, std_err = completed_process.communicate()#[1].strip().decode("utf-8") # catch errors and warnings and convert to str +def execute_command(command, process_name, use_prefix_error_log = True, stdout=subprocess.PIPE): + completed_process = subprocess.Popen(command, stdout=stdout, stderr=subprocess.PIPE) + command_output, std_err = completed_process.communicate()#[1].strip().decode("utf-8") # catch errors and warnings and convert to str std_err = std_err.strip().decode("utf-8") - command_output = std_out.strip().decode("utf-8") + if command_output is not None: + command_output = command_output.strip().decode("utf-8") err_msg = "" if completed_process.returncode != 0: if use_prefix_error_log: @@ -672,9 +680,9 @@ def enpercent(string): return '%' + string + '%' -def get_random_temp_file(fileending, filename_ext = ""): +def get_random_temp_file(fileending, filename_ext = "", folder = tempfile.gettempdir()): filename = collect_info(str(uuid.uuid4()), "", filename_ext, sep = '_') - return os.path.join(tempfile.gettempdir(), filename + "." + str(fileending.strip('.'))).strip('.') + return os.path.join(folder, filename + "." + str(fileending.strip('.'))).strip('.') def rm(path): if os.path.exists(path): @@ -970,4 +978,8 @@ def one_to_three_letter(s): if s == "Y": return "Tyr" if s == "V": return "Val" if s == "X": return "Ter" - return "-" \ No newline at end of file + return "-" + + + + diff --git a/src/common/paths.py b/src/common/paths.py index 5203dee0..451dfce8 100644 --- a/src/common/paths.py +++ b/src/common/paths.py @@ -29,6 +29,7 @@ def joinpaths(path, *paths): resources_dir = joinpaths(workdir, 'resources') logs_dir = joinpaths(workdir, 'logs') downloads_dir = joinpaths(workdir, 'downloads') + download_variant_list_dir = joinpaths(downloads_dir, 'variant_lists') # classified variants folders are calculated in tasks.py #report_dir = joinpaths(workdir, "") #'downloads/consensus_classification_reports/' diff --git a/src/frontend_celery/webapp/download/download_functions.py b/src/frontend_celery/webapp/download/download_functions.py index 521fdb36..a566ec44 100644 --- a/src/frontend_celery/webapp/download/download_functions.py +++ b/src/frontend_celery/webapp/download/download_functions.py @@ -53,12 +53,11 @@ def generate_consensus_only_vcf(variant_types, dummy = False): variant_ids_oi = [] if not dummy: variant_ids_oi = conn.get_variant_ids_with_consensus_classification(variant_types = variant_types) - vcf_file_buffer, status, vcf_errors, err_msg = get_vcf(variant_ids_oi, conn, get_variant_vcf_line_only_consensus, check_vcf=False) - if status == "error": - raise IOError("There was an error during vcf generation: " + str(vcf_errors) + "; " + err_msg) - functions.buffer_to_file_system(vcf_file_buffer, path_to_download) + status, message = write_vcf_file(variant_ids_oi, path_to_download, conn, get_variant_vcf_line_only_consensus, check_vcf = False) conn.close() - + if status == "error": + raise IOError("There was an error during vcf generation: " + str(message)) + with open(last_dump_path, 'w') as last_dump_file: last_dump_file.write(last_dump_date) @@ -121,26 +120,70 @@ def get_vcf(variants_oi, conn, worker=get_variant_vcf_line, check_vcf=True): return buffer, status, "", "" -#import time -#def test_large_download(): -# for i in range(50): -# yield str(i).encode() -# print(i) -# time.sleep(1) -# -# -def get_vcf_stream(variants_oi, conn, worker=get_variant_vcf_line): - for id in variants_oi: - info_headers, variant_vcf = worker(id, conn) - yield variant_vcf - #all_variant_vcf_lines.append(variant_vcf) - #final_info_headers = merge_info_headers(final_info_headers, info_headers) - - #printable_info_headers = list(final_info_headers.values()) - #printable_info_headers.sort() - #functions.write_vcf_header(printable_info_headers, lambda l: yield_something(l.encode()), tail='\n') -def yield_something(val): - yield val + +def write_vcf_file(variant_ids, path, conn: Connection, worker=get_variant_vcf_line, check_vcf = True): + status = "success" + message = "" + + # write file in reverse to disc + generator = get_vcf_stream(variant_ids, conn, worker) + with open(path, "w") as file: + for line in generator: + file.write(line) + + # reverse vcf + path2 = path + ".rev" + command = ["tac", path] + with open(path2, "w") as f: + returncode, serr, sout = functions.execute_command(command, "tac", stdout = f) + if returncode != 0: + status = "error" + message = functions.collect_info(message, "", serr) + message = functions.collect_info(message, "", sout) + functions.rm(path2) + return status, message + functions.rm(path) + returncode, err_msg, command_output = functions.execute_command(["mv", path2, path], "mv") + if returncode != 0: return "error", err_msg + + # check vcf + if check_vcf: + returncode, err_msg, vcf_errors = functions.check_vcf(path) + if returncode != 0: + status = "error" + message = functions.collect_info(message, "", err_msg) + message = functions.collect_info(message, "", vcf_errors) + return status, message + + return status, message + + +# this is a generator to get a vcf file in reverse order +def get_vcf_stream(variant_ids, conn, worker=get_variant_vcf_line): + final_info_headers = {} + for variant_id in variant_ids: + info_headers, variant_vcf = worker(variant_id, conn) + yield variant_vcf + "\n" + final_info_headers = merge_info_headers(final_info_headers, info_headers) + + printable_info_headers = list(final_info_headers.values()) + printable_info_headers.sort() + buffer = io.StringIO() + functions.write_vcf_header(printable_info_headers, lambda l: buffer.write(l), tail='\n') + buffer.seek(0) + for line in reversed(list(buffer)): + yield line + + +def stream_file(file_path, remove_after = False): + file = open(file_path, 'r') + yield from file + file.close() + if remove_after: + os.remove(file_path) + + + diff --git a/src/frontend_celery/webapp/download/download_routes.py b/src/frontend_celery/webapp/download/download_routes.py index 3b6d33f8..20de00af 100644 --- a/src/frontend_celery/webapp/download/download_routes.py +++ b/src/frontend_celery/webapp/download/download_routes.py @@ -8,7 +8,8 @@ from common.db_IO import Connection import common.paths as paths from ..utils import * -from . import download_functions +from . import download_functions, download_tasks +import werkzeug download_blueprint = Blueprint( @@ -33,19 +34,54 @@ def variant(): redirect_url = url_for('variant.display', variant_id = variant_id) download_file_name = "variant_" + str(variant_id) + ".vcf" - vcf_file_buffer, status, vcf_errors, err_msg = download_functions.get_vcf([variant_id], conn, check_vcf=not request.args.get('force', False)) + #vcf_file_buffer, status, vcf_errors, err_msg = download_functions.get_vcf([variant_id], conn, check_vcf=not request.args.get('force', False)) + filepath = functions.get_random_temp_file(fileending = ".vcf", filename_ext = "variant_download_" + str(variant_id)) + status, message = download_functions.write_vcf_file([variant_id], filepath, conn, check_vcf=not request.args.get('force', False)) - if status in ['redirect', 'error']: - flash({"message": "Error during VCF Check: " + vcf_errors + " with error message: " + err_msg + ". Download it anyway", + if status in ['error']: + flash({"message": "Error during VCF generation: " + message + ". Download it anyway", "link": force_url}, "alert-danger") - current_app.logger.error(get_preferred_username() + " tried to download a vcf which contains errors: " + vcf_errors + ". For variant ids: " + str(variant_id)) + current_app.logger.error(get_preferred_username() + " tried to download a vcf which contains errors: " + message + ". For variant ids: " + str(variant_id)) return redirect(redirect_url) current_app.logger.info(get_preferred_username() + " downloaded vcf of variant id: " + str(variant_id)) - return send_file(vcf_file_buffer, as_attachment=True, download_name=download_file_name, mimetype="text/vcf") + return current_app.response_class( + download_functions.stream_file(filepath, remove_after = True), + content_type = "text/event-stream", + headers={'Content-Disposition': f'attachment; filename={download_file_name}', 'X-Accel-Buffering': 'no'} + ) +## listens on get parameter: list_id +#@download_blueprint.route('/download/vcf/variant_list') +#@require_permission(['read_resources']) +#def variant_list(): +# conn = get_connection() +# +# list_id = request.args.get('list_id') +# require_valid(list_id, "user_variant_lists", conn) +# +# # check that the logged in user is the owner of this list +# require_list_permission(list_id, ['read'], conn) +# variant_ids_oi = conn.get_variant_ids_from_list(list_id) +# +# force_url = url_for("download.variant_list", list_id = list_id, force = True) +# redirect_url = url_for("user.my_lists", view = list_id) +# download_file_name = "list_" + str(list_id) + ".vcf" +# +# vcf_file_buffer, status, vcf_errors, err_msg = download_functions.get_vcf(variant_ids_oi, conn) +# +# if status == "redirect": +# flash({"message": "Error during VCF Check: " + vcf_errors + " with error message: " + err_msg + ". Download it anyway", +# "link": force_url}, "alert-danger") +# current_app.logger.error(get_preferred_username() + " tried to download a vcf which contains errors: " + vcf_errors + ". For variant list " + str(list_id)) +# return redirect(redirect_url) +# +# current_app.logger.info(get_preferred_username() + " downloaded vcf of variant list: " + str(list_id)) +# +# return send_file(vcf_file_buffer, as_attachment=True, download_name=download_file_name, mimetype="text/vcf") + # listens on get parameter: list_id @download_blueprint.route('/download/vcf/variant_list') @require_permission(['read_resources']) @@ -57,37 +93,60 @@ def variant_list(): # check that the logged in user is the owner of this list require_list_permission(list_id, ['read'], conn) - variant_ids_oi = conn.get_variant_ids_from_list(list_id) - force_url = url_for("download.variant_list", list_id = list_id, force = True) - redirect_url = url_for("user.my_lists", view = list_id) - download_file_name = "list_" + str(list_id) + ".vcf" + download_queue_id = conn.get_most_recent_download_queue_id(list_id, "list_download") + if download_queue_id is None: + flash("No download available! Generate the VCF first.", "alert-danger") + return redirect(url_for("user.my_lists", view=list_id)) + + download_queue = conn.get_download_queue(download_queue_id) + file_name = download_queue[5] + file_path = paths.download_variant_list_dir + "/" + file_name + is_valid = download_queue[1] + if not is_valid: + flash("No download available! Generate the VCF first.", "alert-danger") + return redirect(url_for("user.my_lists", view=list_id)) + if not functions.is_secure_filename(file_name): + flash("Invalid filename", "alert-danger") + return redirect(url_for("user.my_lists", view=list_id)) + if not os.path.exists(file_path): + flash("No download available! Generate the VCF first.", "alert-danger") + return redirect(url_for("user.my_lists", view=list_id)) - vcf_file_buffer, status, vcf_errors, err_msg = download_functions.get_vcf(variant_ids_oi, conn) + current_app.logger.info(get_preferred_username() + " downloaded vcf of variant list: " + str(list_id)) - if status == "redirect": - flash({"message": "Error during VCF Check: " + vcf_errors + " with error message: " + err_msg + ". Download it anyway", - "link": force_url}, "alert-danger") - current_app.logger.error(get_preferred_username() + " tried to download a vcf which contains errors: " + vcf_errors + ". For variant list " + str(list_id)) - return redirect(redirect_url) + download_filename = "list_" + str(list_id) + ".vcf" + return current_app.response_class( + download_functions.stream_file(file_path), + content_type = "text/event-stream", + headers={'Content-Disposition': f'attachment; filename={download_filename}', 'X-Accel-Buffering': 'no'} + ) - current_app.logger.info(get_preferred_username() + " downloaded vcf of variant list: " + str(list_id)) + +@download_blueprint.route('/generate/vcf/variant_list') +@require_permission(['read_resources']) +def generate_variant_list_vcf(): + conn = get_connection() + + list_id = request.args.get('list_id') + require_valid(list_id, "user_variant_lists", conn) + + # check that the logged in user is the owner of this list + require_list_permission(list_id, ['read'], conn) + + download_queue_id = conn.get_most_recent_download_queue_id(list_id, "list_download") + download_queue = conn.get_download_queue(download_queue_id) + if download_queue is not None: + if download_queue[1] in ["pending", "progress", "retry"]: + return "skipped" - return send_file(vcf_file_buffer, as_attachment=True, download_name=download_file_name, mimetype="text/vcf") + download_queue_id = download_tasks.start_generate_list_vcf(list_id, session["user"]["roles"], conn) + return "success" -from flask import Response, stream_with_context -#@download_blueprint.route('/download/test') -#@require_permission(["admin_resources"]) -#def download_test(): -# return Response( -# stream_with_context(download_functions.test_large_download()), -# headers={'Content-Disposition': 'attachment; filename=test.txt'} -# ) -# -@download_blueprint.route('/download/test_vcf') -@require_permission(["admin_resources"]) -def download_test_vcf(): +@download_blueprint.route('/generate/vcf/variant_list/status') +@require_permission(['read_resources']) +def generate_variant_list_vcf_status(): conn = get_connection() list_id = request.args.get('list_id') @@ -95,13 +154,61 @@ def download_test_vcf(): # check that the logged in user is the owner of this list require_list_permission(list_id, ['read'], conn) - variant_ids_oi = conn.get_variant_ids_from_list(list_id) - return Response( - stream_with_context(download_functions.get_vcf_stream(variant_ids_oi, conn)), - content_type = "text/event-stream", - headers={'Content-Disposition': 'attachment; filename=test.txt', 'X-Accel-Buffering': 'no'} - ) + download_queue_id = conn.get_most_recent_download_queue_id(list_id, "list_download") + download_queue_raw = conn.get_download_queue(download_queue_id, minimal_info = True) #requested_at, status, finished_at, message, is_valid + + if download_queue_raw is None: + return jsonify({ + "requested_at": "", + "status": "no_vcf", + "finished_at": "", + "message": "", + "is_valid": 0 + }) + + return jsonify({ + "requested_at": download_queue_raw[0], + "status": download_queue_raw[1], + "finished_at": download_queue_raw[2], + "message": download_queue_raw[3], + "is_valid": download_queue_raw[4] + }) + + + +#@download_blueprint.route('/download/test') +#@require_permission(["admin_resources"]) +#def download_test(): +# import time +# def test_large_download(): +# for i in range(50): +# yield str(i).encode() +# print(i) +# time.sleep(1) +# return Response( +# stream_with_context(download_functions.test_large_download()), +# content_type = "text/event-stream", +# headers={'Content-Disposition': 'attachment; filename=test.txt', 'X-Accel-Buffering': 'no'} +# ) + +#@download_blueprint.route('/download/test_vcf') +#@require_permission(["admin_resources"]) +#def download_test_vcf(): +# conn = get_connection() +# +# list_id = request.args.get('list_id') +# require_valid(list_id, "user_variant_lists", conn) +# +# # check that the logged in user is the owner of this list +# require_list_permission(list_id, ['read'], conn) +# variant_ids_oi = conn.get_variant_ids_from_list(list_id) +# +# return Response( +# stream_with_context(download_functions.get_vcf_stream(variant_ids_oi, conn)), +# content_type = "text/event-stream", +# headers={'Content-Disposition': 'attachment; filename=test.txt', 'X-Accel-Buffering': 'no'} +# ) # listens on get parameter: raw diff --git a/src/frontend_celery/webapp/download/download_tasks.py b/src/frontend_celery/webapp/download/download_tasks.py new file mode 100644 index 00000000..1676e41d --- /dev/null +++ b/src/frontend_celery/webapp/download/download_tasks.py @@ -0,0 +1,100 @@ +from urllib.error import HTTPError +import sys +from os import path +sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) +from webapp import celery, utils +from common import functions +from common import paths +from common.db_IO import Connection +from celery.exceptions import Ignore +import time + +from . import download_functions +# errors +from urllib.error import HTTPError +from mysql.connector import Error, InternalError +import traceback + + + +########################################################### +############## START VARIANT TSV BULK IMPORT ############## +########################################################### + +def start_generate_list_vcf(list_id, user_roles, conn: Connection): + functions.mkdir_recursive(paths.download_variant_list_dir) + filename = functions.get_random_temp_file(fileending = ".vcf", filename_ext = "_".join(["list", str(list_id), functions.get_today()]), folder = "") + + request_type = "list_download" + + utils.invalidate_download_queue(list_id, request_type, conn) + download_queue_id = conn.insert_download_request(list_id, request_type, filename) + + task = generate_list_vcf.apply_async(args=[list_id, user_roles, download_queue_id, filename]) # start task + task_id = task.id + + conn.update_download_queue_celery_task_id(download_queue_id, celery_task_id = task_id) # save the task id for status updates + + return download_queue_id + + +@celery.task(bind=True, retry_backoff=5, max_retries=3) +def generate_list_vcf(self, list_id, user_roles, download_queue_id, filename): + """ Background task for importing generating vcf file of variant list """ + self.update_state("PROGRESS") + + try: + conn = Connection(user_roles) + message = "" + status = "success" + + conn.update_download_queue_status(download_queue_id, status = "progress", message = "") + + if not functions.is_secure_filename(filename): # just for safety - might not be required + status = "error" + message = "Invalid filename provided!" + else: + variant_ids_oi = conn.get_variant_ids_from_list(list_id) + filepath = paths.download_variant_list_dir + "/" + filename + status, message = download_functions.write_vcf_file(variant_ids_oi, filepath, conn) + + except InternalError as e: + # deadlock: code 1213 + status = "retry" + message = "Attempting retry because of database error: " + str(e) + ' ' + traceback.format_exc() + except Error as e: + status = "error" + message = "There was a database error: " + str(e) + ' ' + traceback.format_exc() + except HTTPError as e: + status = "retry" + message = "Attempting retry because of http error: " + str(e) + ' ' + traceback.format_exc() + except Exception as e: + status = "error" + message = "There was a runtime error: " + str(e) + ' ' + traceback.format_exc() + + print(status) + #print(message) + + if status != "retry": + conn.close_download_queue(status, download_queue_id, message = message[0:1000]) + else: + conn.update_download_queue_status(download_queue_id, status = status, message = message[0:1000]) + + conn.close() + + if status == 'error': + self.update_state(state="FAILURE", meta={ + 'exc_type': "Runtime error", + 'exc_message': message, + 'custom': '...' + }) + raise Ignore() + elif status == "retry": + self.update_state(state="RETRY", meta={ + 'exc_type': "Runtime error", + 'exc_message': message, + 'custom': '...'}) + generate_list_vcf.retry() + else: + self.update_state(state="SUCCESS") + diff --git a/src/frontend_celery/webapp/static/js/my_lists.js b/src/frontend_celery/webapp/static/js/my_lists.js index ddf78b76..e58c2b97 100644 --- a/src/frontend_celery/webapp/static/js/my_lists.js +++ b/src/frontend_celery/webapp/static/js/my_lists.js @@ -9,6 +9,8 @@ $(document).ready(function() const page_size = flask_data.dataset.pageSize const base_delete_action_url = flask_data.dataset.deleteAction const view_list = flask_data.dataset.viewList + const generate_list_vcf_url = flask_data.dataset.generateListVcfUrl + const generate_list_vcf_status_url = flask_data.dataset.generateListVcfStatusUrl // edit / create button functionality $('#list-modal-submit').click(function(){ @@ -17,25 +19,24 @@ $(document).ready(function() $('#list-modal-form').submit(); }); - $('#export_to_vcf_button').click(function(){ + $('#generate_list_vcf_button').click(function(){ //document.getElementById("export_to_vcf_worker").click() - console.log(view_list) - console.log("started") + console.log(generate_list_vcf_url) $.ajax({ type: 'GET', - url: "/download/vcf/variant_list", - data: {'list_id': view_list}, + url: generate_list_vcf_url, success: function(returnval, status, request) { - console.log(returnval) + show_vcf_gen_status('bg-secondary', "Annotation requested successfully", "Annotation requested") + update_vcf_generation_status(generate_list_vcf_status_url); }, error: function(xhr, status, error) { console.log(xhr) - console.log(status) - console.log(error) + show_vcf_gen_status("bg-danger", "The variant could not be annotated. Either the service is unreachable or some other unexpected exception occured. You can try to reload this page or issue another annotation to fix this. If that does not help try again later. Status code is: " + xhr.status, "Internal error") + $('#reannotate_button').attr('disabled', false) } }); - console.log("after download") }); + update_vcf_generation_status(generate_list_vcf_status_url) var list_id = $('#current-list-id')[0].innerText @@ -103,6 +104,92 @@ $(document).ready(function() + +//
+//{% if generate_list_vcf_status is not none %} +// {% if list_import_status[1] == 'pending' %} +// VCF gen requested +// {% endif %} +// {% if list_import_status[1] == 'success' %} +// VCF gen successful +// {% endif %} +// {% if list_import_status[1] == 'error' %} +// VCF gen error +// {% endif %} +// {% if list_import_status[1] == 'retry' %} +// VCF gen retry +// {% endif %} +// {% if list_import_status[1] == 'aborted' %} +// VCF gen aborted +// {% endif %} +// {% if list_import_status[1] == 'progress' %} +// VCF gen aborted +// {% endif %} +//{% else %} +// No VCF +//{% endif %} +//
+ + +// polling & status display update +function update_vcf_generation_status(status_url) { + // send GET request to status URL (defined by flask) + $.ajax({ + type: 'GET', + url: status_url, + success: function(data, status, request) { + console.log(data) + if (data === undefined) { + show_vcf_gen_status("bg-secondary", "VCF generation is required if you want to download the list in VCF format. You can generate one through the gear button.", "No VCF") + } else if (data["is_valid"] == 0) { + show_vcf_gen_status("bg-secondary", "VCF generation is required if you want to download the list in VCF format. You can generate one through the gear button.", "No VCF") + } else { + if (data['status'] == 'pending') { + show_vcf_gen_status("bg-secondary", "VCF generation is queued. This job is waiting for other jobs to finish first.", "VCF gen queued") + } else if (data['status'] == 'progress') { + show_vcf_gen_status("bg-primary", "VCF generation is being processed in the background. Please wait for it to finish.", "VCF gen processing") + } else if (data['status'] == "success") { + show_vcf_gen_status("bg-success", "VCF generation finished at " + data["finished_at"], "VCF gen success") + } else if (data['status'] == "error") { + show_vcf_gen_status("bg-danger", "VCF generation annotation finished at " + data["finished_at"] + " with fatal error: " + data['message'], "VCF gen error") + } else if (data['status'] == 'retry') { + show_vcf_gen_status("bg-warning", "Task yielded an error: " + data['error_message'] + ". Will try again soon.", "Retrying VCF gen") + } else if (data['status'] == 'aborted') { + show_vcf_gen_status("bg-primary", "This task was manually aborted at " + data['finished_at'], "VCF gen aborted") + } else if (data['status'] == 'no_vcf') { + show_vcf_gen_status("bg-secondary", "VCF generation is required if you want to download the list in VCF format. You can generate one through the gear button.", "No VCF") + } else { + show_vcf_gen_status("bg-warning", "An unexpected status found: " + data['status'], "Unexpected status") + //$('#reannotate_button').attr('disabled', false); + } + + // polling happens here: + // rerun in 5 seconds if state resembles an unfinished task + if (data['status'] == 'pending' || data['status'] == 'progress' || data['status'] == 'retry') { + setTimeout(function() { + update_vcf_generation_status(status_url); + }, 5000); + } + } + }, + error: function(xhr) { + show_vcf_gen_status("bg-danger", "Unable to fetch the status. Server returned http status " + xhr.status, "Internal error") + $('#reannotate_button').attr('disabled', false) + } + }) + +} + +function show_vcf_gen_status(color_class, tooltip_text, inner_text) { + const pill_holder_id = "vcf_gen_pill_holder" + const pill_id = "vcf_gen_pill" + show_status(color_class, tooltip_text, inner_text, pill_holder_id, pill_id) +} + + + + + function create_delete_button(parent, base_delete_action_url, variant_id) { /* This is what the following should look like in html: diff --git a/src/frontend_celery/webapp/static/js/utils.js b/src/frontend_celery/webapp/static/js/utils.js index 308e28f6..75259be5 100644 --- a/src/frontend_celery/webapp/static/js/utils.js +++ b/src/frontend_celery/webapp/static/js/utils.js @@ -669,4 +669,22 @@ function remove_tooltip(element) { element.removeAttribute('data-bs-toggle') element.removeAttribute('data-bs-title') $(element).tooltip('dispose') +} + + + +// utility for showing/updating the current status +function show_status(color_class, tooltip_text, inner_text, pill_holder_id, pill_id) { + $('#' + pill_id).tooltip('hide') // hide tooltip in case it is shown - prevents persisting tooltips on update + var pill_holder = document.getElementById(pill_holder_id) + pill_holder.innerHTML = "" // delete previous pill + var status_pill = document.createElement('span') + status_pill.classList.add('badge') + status_pill.classList.add('rounded-pill') + status_pill.classList.add(color_class) + status_pill.setAttribute('data-bs-toggle', "tooltip") + status_pill.setAttribute('title', tooltip_text) + status_pill.id=pill_id + status_pill.innerText = inner_text + pill_holder.appendChild(status_pill) // add new pill } \ No newline at end of file diff --git a/src/frontend_celery/webapp/static/js/variant_addition.js b/src/frontend_celery/webapp/static/js/variant_addition.js index 92b4c61f..a3df1a5d 100644 --- a/src/frontend_celery/webapp/static/js/variant_addition.js +++ b/src/frontend_celery/webapp/static/js/variant_addition.js @@ -153,21 +153,26 @@ function update_annotation_status(status_url, show_reload_modal=false) { } - -// utility for showing the current annotation status function show_annotation_status(color_class, tooltip_text, inner_text) { - $('#annotation_status_pill').tooltip('hide') - document.getElementById('annotation_status_pill_holder').innerHTML = "" - var status_pill = document.createElement('span') - status_pill.classList.add('badge') - status_pill.classList.add('rounded-pill') - status_pill.classList.add(color_class) - status_pill.setAttribute('data-bs-toggle', "tooltip") - status_pill.setAttribute('title', tooltip_text) - status_pill.innerText = inner_text - annotation_status_pill_holder.appendChild(status_pill) + const pill_holder_id = "annotation_status_pill_holder" + const pill_id = "annotation_status_pill" + show_status(color_class, tooltip_text, inner_text, pill_holder_id, pill_id) } +//// utility for showing the current annotation status +//function show_annotation_status(color_class, tooltip_text, inner_text) { +// $('#annotation_status_pill').tooltip('hide') +// document.getElementById('annotation_status_pill_holder').innerHTML = "" +// var status_pill = document.createElement('span') +// status_pill.classList.add('badge') +// status_pill.classList.add('rounded-pill') +// status_pill.classList.add(color_class) +// status_pill.setAttribute('data-bs-toggle', "tooltip") +// status_pill.setAttribute('title', tooltip_text) +// status_pill.innerText = inner_text +// annotation_status_pill_holder.appendChild(status_pill) +//} + ///////////////////////////////////////////////// ////////////////// IGV UTILITY ////////////////// diff --git a/src/frontend_celery/webapp/tasks.py b/src/frontend_celery/webapp/tasks.py index 0cbe818f..7cb2f46f 100644 --- a/src/frontend_celery/webapp/tasks.py +++ b/src/frontend_celery/webapp/tasks.py @@ -850,7 +850,7 @@ def abort_annotation_tasks(annotation_requests, conn:Connection): def abort_annotation_task(annotation_queue_id, celery_task_id, conn:Connection): if annotation_queue_id is not None: - celery.control.revoke(celery_task_id, terminate = True) + abort_task(celery_task_id) #row_id, status, error_msg conn.update_annotation_queue(annotation_queue_id, "aborted", "") @@ -859,7 +859,8 @@ def abort_annotation_task(annotation_queue_id, celery_task_id, conn:Connection): def purge_celery(): celery.control.purge() - +def abort_task(celery_task_id): + celery.control.revoke(celery_task_id, terminate = True) diff --git a/src/frontend_celery/webapp/templates/index.html b/src/frontend_celery/webapp/templates/index.html index 651551cc..e1dd444a 100644 --- a/src/frontend_celery/webapp/templates/index.html +++ b/src/frontend_celery/webapp/templates/index.html @@ -93,16 +93,19 @@

Overview

Changelog

-
v 1.13.7 (20.08.2024)
+
v 1.14 ()
General changes:
  • Added ACMG schemes for MMR genes (pms2, mlh1, msh2, msh6)
  • +
  • Improved VCF downloads. Variant list VCFs must now be generated seperately prior to downloading. This allows for larger file downloads.
  • +
  • Added upload state search
Bugfixes:
  • Fixed some minor issues with HerediCaRe up- and downloads
  • Downgraded CrossMap to v0.7.0 because v0.7.3 yields wrong results for specific variants
  • +
  • Fixed a bug where the tooltip of status pills would persist if the pill was updated
diff --git a/src/frontend_celery/webapp/templates/macros.html b/src/frontend_celery/webapp/templates/macros.html index b1f1dc40..f0c4cfc6 100644 --- a/src/frontend_celery/webapp/templates/macros.html +++ b/src/frontend_celery/webapp/templates/macros.html @@ -669,8 +669,38 @@ +
+ +
+
+ {% set selected_clinvar_upload_state = request.args.getlist('clinvar_upload_state') %} + +
+ {% for s in static_information['allowed_clinvar_upload_states'] %} +
+ + +
+ {% endfor %} +
+
+
+ +
+ {% set selected_heredicare_upload_state = request.args.getlist('heredicare_upload_state') %} + +
+ {% for s in static_information['allowed_heredicare_upload_states'] %} +
+ + +
+ {% endfor %} +
+
+
{% if static_information['lists'] is not none %} diff --git a/src/frontend_celery/webapp/templates/user/my_lists.html b/src/frontend_celery/webapp/templates/user/my_lists.html index e180d3c9..58acf0c4 100644 --- a/src/frontend_celery/webapp/templates/user/my_lists.html +++ b/src/frontend_celery/webapp/templates/user/my_lists.html @@ -75,11 +75,10 @@

{% block title %} Your variant lists {% endblock %}