diff --git a/src/common/db_IO.py b/src/common/db_IO.py index c75a8191..8e8ba96e 100644 --- a/src/common/db_IO.py +++ b/src/common/db_IO.py @@ -247,8 +247,7 @@ def insert_variant(self, chr, pos, ref, alt, orig_chr, orig_pos, orig_ref, orig_ self.cursor.execute(command, (chr, pos, ref, alt, orig_chr, orig_pos, orig_ref, orig_alt)) self.conn.commit() variant_id = self.get_variant_id(chr, pos, ref, alt) - self.insert_annotation_request(variant_id, user_id) - return self.get_last_insert_id() # return the annotation_queue_id of the new variant + return variant_id # return the annotation_queue_id of the new variant def insert_external_variant_id(self, variant_id, external_id, id_source): command = "INSERT INTO variant_ids (variant_id, external_id, id_source) \ @@ -1159,25 +1158,85 @@ def get_orig_variant(self, variant_id): res = self.cursor.fetchone() return res + + + def insert_import_request(self, user_id): command = "INSERT INTO import_queue (user_id) VALUES (%s)" self.cursor.execute(command, (user_id, )) self.conn.commit() return self.get_most_recent_import_request() - def close_import_request(self, import_queue_id): - command = "UPDATE import_queue SET status = 'finished', finished_at = NOW() WHERE id = %s" - self.cursor.execute(command, (import_queue_id, )) - self.conn.commit() - def get_most_recent_import_request(self): - self.cursor.execute("SELECT id, user_id, requested_at, status, finished_at FROM import_queue ORDER BY requested_at DESC LIMIT 1") + self.cursor.execute("SELECT id, user_id, requested_at, status, finished_at, message FROM import_queue ORDER BY requested_at DESC LIMIT 1") + import_request_raw = self.cursor.fetchone() + import_request = self.convert_raw_import_request(import_request_raw) + return import_request + + def get_import_request(self, import_queue_id): + command = "SELECT id, user_id, requested_at, status, finished_at, message FROM import_queue WHERE id = %s" + self.cursor.execute(command, (import_queue_id, )) + import_request_raw = self.cursor.fetchone() + import_request = self.convert_raw_import_request(import_request_raw) + return import_request + + def get_max_finished_at_import_variant(self, import_queue_id): + command = "SELECT MAX(finished_at) FROM import_variant_queue WHERE import_queue_id = %s" + self.cursor.execute(command, (import_queue_id, )) result = self.cursor.fetchone() - if result is not None: - user = self.parse_raw_user(self.get_user(result[1])) - requested_at = result[2] # datetime object - finished_at = result[4] # datetime object - result = models.import_request(id = result[0], user = user, requested_at = requested_at, status = result[3], finished_at = finished_at) + print(result) + return result[0] + + def convert_raw_import_request(self, import_request_raw): + if import_request_raw is None: + return None + import_queue_id = import_request_raw[0] + user = self.parse_raw_user(self.get_user(import_request_raw[1])) + variant_summary = self.get_variant_summary(import_queue_id) + requested_at = import_request_raw[2] # datetime object + import_variant_list_finished_at = import_request_raw[4] # datetime object + + import_variant_list_status = import_request_raw[3] + + #1. pending: from status of import + #2. fetching vids: status of import is processing + #3. fetching variants: status of import is success and there are still non finished variants + #4. error: import status is error + #5. success: all variants are processed and + status = "unknown" + finished_at = None + if import_variant_list_status == "pending": + status = "pending" + elif import_variant_list_status == "processing": + status = "fetching vids" + elif import_variant_list_status == "success" and any([key_oi in variant_summary for key_oi in ["pending", "progress"]]): + status = "fetching variants" + elif import_variant_list_status == "error": + status = "error" + finished_at = import_variant_list_finished_at + elif import_variant_list_status == "success": + status = "finished" + finished_at = self.get_max_finished_at_import_variant(import_queue_id) + + result = models.import_request(id = import_queue_id, + user = user, + requested_at = requested_at, + status = status, + finished_at = finished_at, + import_variant_list_status = import_variant_list_status, + import_variant_list_finished_at = import_variant_list_finished_at, + import_variant_list_message = import_request_raw[5], + variant_summary = variant_summary + ) + return result + + def get_variant_summary(self, import_queue_id): + command = "SELECT count(*) as count, status from import_variant_queue WHERE import_queue_id = %s GROUP BY status" + self.cursor.execute(command, (import_queue_id, )) + result_raw = self.cursor.fetchall() + result = {} + for elem in result_raw: + result[elem[1]] = elem[0] return result def update_import_queue_status(self, import_queue_id, status, message): @@ -1185,6 +1244,65 @@ def update_import_queue_status(self, import_queue_id, status, message): self.cursor.execute(command, (status, message, import_queue_id)) self.conn.commit() + def update_import_queue_celery_task_id(self, import_queue_id, celery_task_id): + command = "UPDATE import_queue SET celery_task_id = %s WHERE id = %s" + self.cursor.execute(command, (celery_task_id, import_queue_id)) + self.conn.commit() + + def close_import_request(self, import_queue_id, status, message): + self.update_import_queue_status(import_queue_id, status, message) + command = "UPDATE import_queue SET finished_at = \"1999-01-01 00:00:00\" WHERE id = %s" + self.cursor.execute(command, (import_queue_id, )) + self.conn.commit() + + + + + + def insert_variant_import_request(self, vid, import_queue_id): + command = "INSERT INTO import_variant_queue (vid, import_queue_id) VALUES (%s, %s)" + self.cursor.execute(command, (vid, import_queue_id)) + self.conn.commit() + return self.get_last_insert_id() + + def update_import_variant_queue_celery_id(self, variant_import_queue_id, celery_task_id): + command = "UPDATE import_variant_queue SET celery_task_id = %s WHERE id = %s" + self.cursor.execute(command, (celery_task_id, variant_import_queue_id)) + self.conn.commit() + + def update_import_variant_queue_status(self, variant_import_queue_id, status, message): + command = "UPDATE import_variant_queue SET status = %s, message = %s WHERE id = %s" + self.cursor.execute(command, (status, message, variant_import_queue_id)) + self.conn.commit() + + def close_import_variant_request(self, variant_import_queue_id, status, message): + self.update_import_variant_queue_status(variant_import_queue_id, status, message) + command = "UPDATE import_variant_queue SET finished_at = NOW() WHERE id = %s" + self.cursor.execute(command, (variant_import_queue_id, )) + self.conn.commit() + + def get_imported_variants(self, import_queue_id): + command = "SELECT id, status, requested_at, finished_at, message, vid FROM import_variant_queue WHERE import_queue_id = %s" + self.cursor.execute(command, (import_queue_id, )) + raw_results = self.cursor.fetchall() + return [self.convert_raw_import_variant_request(raw_result) for raw_result in raw_results] + + + def convert_raw_import_variant_request(self, import_variant_request_raw): + if import_variant_request_raw is None: + return None + requested_at = import_variant_request_raw[2] # datetime object + finished_at = import_variant_request_raw[3] # datetime object + result = models.Import_variant_request(id = import_variant_request_raw[0], + status = import_variant_request_raw[1], + requested_at = requested_at, + finished_at = finished_at, + message = import_variant_request_raw[4], + vid = import_variant_request_raw[5] + ) + return result + + # returns a list of external ids if an id source is given # returns a list of tuples with (external_id, source) if no id source is given -> exports all external ids def get_external_ids_from_variant_id(self, variant_id, id_source=''): @@ -1215,6 +1333,7 @@ def update_variant_annotation(self, variant_id, annotation_type_id, value): # us self.cursor.execute(command, (value, variant_id, annotation_type_id)) self.conn.commit() + """ def get_import_request(self, import_queue_id = '', date = ''): command = '' if import_queue_id != '': @@ -1230,6 +1349,7 @@ def get_import_request(self, import_queue_id = '', date = ''): res = self.cursor.fetchone() return res return None + """ def get_heredicare_center_classifications(self, variant_id): command = 'SELECT * FROM heredicare_center_classification WHERE variant_id = %s' diff --git a/src/common/functions.py b/src/common/functions.py index 61de4a4a..e2992f05 100644 --- a/src/common/functions.py +++ b/src/common/functions.py @@ -556,4 +556,10 @@ def get_random_temp_file(fileending): def rm(path): if os.path.exists(path): - os.remove(path) \ No newline at end of file + os.remove(path) + +def str2datetime(datetime_str, fmt): + if datetime_str is None: + return None + else: + return datetime.datetime.strptime(datetime_str, fmt) \ No newline at end of file diff --git a/src/common/models.py b/src/common/models.py index 609a78b8..bcae8672 100644 --- a/src/common/models.py +++ b/src/common/models.py @@ -770,3 +770,16 @@ class import_request: status: str finished_at: datetime.datetime + import_variant_list_status: str + import_variant_list_finished_at: datetime.datetime + import_variant_list_message: str + variant_summary: dict + +@dataclass +class Import_variant_request: + id: int + status: str + requested_at: datetime.datetime + finished_at: datetime.datetime + message: str + vid: str \ No newline at end of file diff --git a/src/frontend_celery/webapp/io/task_helper_routes.py b/src/frontend_celery/webapp/io/task_helper_routes.py index 2ea06aea..234515f4 100644 --- a/src/frontend_celery/webapp/io/task_helper_routes.py +++ b/src/frontend_celery/webapp/io/task_helper_routes.py @@ -2,7 +2,7 @@ from os import path import sys -from ..utils import require_permission, start_annotation_service, get_connection +from ..utils import require_permission, get_connection sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))) import common.functions as functions from common.db_IO import Connection diff --git a/src/frontend_celery/webapp/static/js/utils.js b/src/frontend_celery/webapp/static/js/utils.js index 23316579..b72f919b 100644 --- a/src/frontend_celery/webapp/static/js/utils.js +++ b/src/frontend_celery/webapp/static/js/utils.js @@ -109,10 +109,13 @@ function filterTable_multiple_columns(filter, table, filter_visible = false) { // adding a row if the filters removed all rows ie. the table is empty function add_default_caption(table) { - const cap = document.createElement("caption"); - cap.setAttribute('class', 'defaultrow'); - cap.textContent ='Nothing to show'; - table.appendChild(cap); + var caps = table.getElementsByClassName('defaultrow'); + if (caps.length == 0) { + const cap = document.createElement("caption"); + cap.setAttribute('class', 'defaultrow'); + cap.textContent ='Nothing to show'; + table.appendChild(cap); + } } // removing the empty-table-row @@ -124,6 +127,49 @@ function remove_default_caption(table) { } +function update_default_caption(table) { + const tbody = table.getElementsByTagName('tbody')[0]; + const rows = tbody.getElementsByTagName('tr'); + if (rows.length > 0) { + remove_default_caption(table); + } else { + add_default_caption(table); + } +} + + + +function activate_datatables(table_id) { + // ACTIVATE DATATABLES + // Setup - add a text input to each header cell + $('#' + table_id + ' thead th').each(function() { + var new_search_input = document.createElement('input') + new_search_input.setAttribute('placeholder', 'search...') + new_search_input.classList.add(this.classList) + $(this).append(new_search_input) + }); + + // DataTable + var the_table = $('#' + table_id).DataTable({ + order: [[0, 'desc']], + }); + + // Apply the search + the_table.columns().eq(0).each(function(colIdx) { + $('input', the_table.column(colIdx).header()).on('keyup change', function() { + the_table + .column(colIdx) + .search(this.value) + .draw(); + }); + + $('input', the_table.column(colIdx).header()).on('click', function(e) { + e.stopPropagation(); + }); + }); +} + + ////////////////////////////////////////////////////////////// ////////////////// sort table functionality ////////////////// diff --git a/src/frontend_celery/webapp/static/js/variant.js b/src/frontend_celery/webapp/static/js/variant.js index 34a25ccb..80104295 100644 --- a/src/frontend_celery/webapp/static/js/variant.js +++ b/src/frontend_celery/webapp/static/js/variant.js @@ -17,35 +17,7 @@ $(document).ready(function() //table_sorter(['#literatureTableYearCol'], '#literatureTable') - - - // ACTIVATE DATATABLES - // Setup - add a text input to each header cell - $('#literatureTable thead th').each(function() { - var new_search_input = document.createElement('input') - new_search_input.setAttribute('placeholder', 'search...') - new_search_input.classList.add(this.classList) - $(this).append(new_search_input) - }); - - // DataTable - var literature_table = $('#literatureTable').DataTable({ - order: [[0, 'desc']], - }); - - // Apply the search - literature_table.columns().eq(0).each(function(colIdx) { - $('input', literature_table.column(colIdx).header()).on('keyup change', function() { - literature_table - .column(colIdx) - .search(this.value) - .draw(); - }); - - $('input', literature_table.column(colIdx).header()).on('click', function(e) { - e.stopPropagation(); - }); - }); + activate_datatables("literatureTable"); // functionality for the hide variant button $('#change_hidden_state').click(function() { diff --git a/src/frontend_celery/webapp/static/js/variant_import_summary.js b/src/frontend_celery/webapp/static/js/variant_import_summary.js new file mode 100644 index 00000000..61139e0b --- /dev/null +++ b/src/frontend_celery/webapp/static/js/variant_import_summary.js @@ -0,0 +1,165 @@ + +const flask_data = document.getElementById("flask_data") +const data_url = flask_data.dataset.getDataUrl; + + + +$(document).ready(function(){ + + activate_datatables("erroneous_variant_table") + + update_page(data_url) + + + + + + +}); + + + + + +// polling & status display update +function update_page(url) { + + $.getJSON(url, function(data) { + + console.log(data) + update_general_information(data) + update_erroneous_variants(data['imported_variants']) + update_variant_summary(data["import_request"]["variant_summary"]) + + // polling happens here: + // rerun in 5 seconds if state resembles an unfinished task + const import_status = data["import_request"]["status"] + if (import_status === "pending" || import_status === "fetching vids" || import_status === "fetching variants") { + + setTimeout(function() { + update_annotation_status(url); + }, 5000); + } + }); + +} + + + + + + + +//function update_page() { +// +// $.ajax({ +// type: 'GET', +// url: data_url, +// success: function(data, status, request) { +// +// console.log(data) +// update_general_information(data) +// update_erroneous_variants(data['imported_variants']) +// update_variant_summary(data["import_request"]["variant_summary"]) +// +// +// }, +// error: function() { +// console.log('error during data retrieval!'); +// } +// }); +// +//} + + + +function update_general_information(data) { + // update total number of variants + document.getElementById("summary_total_num_variants").textContent = data["imported_variants"].length + + // update the user + document.getElementById("summary_user").textContent = data["import_request"]["user"]["full_name"] + + //update the requested date + document.getElementById("summary_requested_at").textContent = data["import_request"]["requested_at"] + + // update general status + document.getElementById("summary_status").textContent = data["import_request"]["status"] + + //update finished at date + document.getElementById("summary_finished_at").textContent = data["import_request"]["finished_at"] + + // update message + document.getElementById("summary_message").textContent = data["import_request"]["import_variant_list_message"] +} + + + +function update_variant_summary(summary_data) { + all_tds = {"pending": "variant_summary_pending", + "processing": "variant_summary_processing", + "error": "variant_summary_erroneous", + "new": "variant_summary_new", + "deleted": "variant_summary_deleted", + "retry": "variant_summary_retrying" + } + + for (var key in all_tds) { + var current_td = document.getElementById(all_tds[key]) + if (key in summary_data) { + current_td.textContent = summary_data[key] + } else { + current_td.textContent = "-" + } + } +} + + +function update_erroneous_variants(variants) { + const table = $('#erroneous_variant_table').DataTable(); + table.clear().draw(); // remove all rows that are currently there + + for (var i = 0; i < variants.length; i++) { + var current_variant = variants[i]; + if (current_variant['status'] === "error") { + var new_trow = create_erroneous_variant_row(current_variant); + table.row.add(new_trow).draw(false) + } + + } + + update_default_caption(document.getElementById("erroneous_variant_table")) + + +} + + +function create_erroneous_variant_row(variant) { + const tds = [ + create_td(variant["vid"]), + create_td(variant["status"]), + create_td(variant["requested_at"]), + create_td(variant["finished_at"]), + create_td(variant["message"]) + ]; + const trow = create_trow(tds); + return trow; +} + + + +function create_td(text_content) { + var td = document.createElement('td'); + td.textContent = text_content; + return td; +} + +function create_trow(tds) { + var trow = document.createElement('tr'); + for (var i = 0; i < tds.length; i++) { + trow.appendChild(tds[i]); + } + return trow; +} + + diff --git a/src/frontend_celery/webapp/tasks.py b/src/frontend_celery/webapp/tasks.py index ede7d86b..872976a5 100644 --- a/src/frontend_celery/webapp/tasks.py +++ b/src/frontend_celery/webapp/tasks.py @@ -3,9 +3,9 @@ import sys from os import path sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))) -#import common.functions as functions +import common.functions as functions from common.db_IO import Connection -from annotation_service.main import process_one_request +from annotation_service.main import process_one_request, get_default_job_config from celery.exceptions import Ignore from flask_mail import Message from flask import render_template @@ -48,114 +48,367 @@ def fetch_consequence_task(self, variant_id): """ -# this uses exponential backoff in case there is a http error -# this will retry 3 times before giving up -# first retry after 5 seconds, second after 25 seconds, third after 125 seconds (if task queue is empty that is) + + + + + + + +################################################################################### +############## IMPORT VARIANT LIST THAT GOT UPDATE SINCE LAST IMPORT ############## +################################################################################### + +def start_variant_import(user_id, user_roles, conn: Connection): # starts the celery task + import_request = conn.get_most_recent_import_request() # get the most recent import request + min_date = None + if import_request is not None: + min_date = import_request.finished_at + #print(import_request.finished_at) + + new_import_request = conn.insert_import_request(user_id) + import_queue_id = new_import_request.id + + task = heredicare_variant_import.apply_async(args=[user_id, user_roles, min_date, import_queue_id]) # start task + task_id = task.id + + conn.update_import_queue_celery_task_id(import_queue_id, celery_task_id = task_id) # save the task id for status updates + + return import_queue_id + @celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600) -def annotate_variant(self, annotation_queue_id, job_config): - """Background task for running the annotation service""" - self.update_state(state='PROGRESS', meta={'annotation_queue_id':annotation_queue_id}) - status, runtime_error = process_one_request(annotation_queue_id, job_config=job_config) - status = 'success' +def heredicare_variant_import(self, user_id, user_roles, min_date, import_queue_id): + """Background task for fetching variants from HerediCare""" + #from frontend_celery.webapp.utils.variant_importer import import_variants + self.update_state(state='PROGRESS') + + conn = Connection(user_roles) + + conn.update_import_queue_status(import_queue_id, status = "progress", message = "") + + + status, message = import_variants(conn, user_id, user_roles, functions.str2datetime(min_date, fmt = '%Y-%m-%dT%H:%M:%S'), import_queue_id) + + if status != "retry": + conn.close_import_request(import_queue_id, status = status, message = message) + else: + conn.update_import_queue_status(import_queue_id, status = status, message = message) + + conn.close() + + #status, message = fetch_heredicare(vid, heredicare_interface) if status == 'error': - status = 'FAILURE' - self.update_state(state=status, meta={'annotation_queue_id':annotation_queue_id, + self.update_state(state="FAILURE", meta={ 'exc_type': "Runtime error", - 'exc_message': "The annotation service yielded a runtime error: " + runtime_error, + 'exc_message': message, 'custom': '...' }) raise Ignore() - if status == "retry": - status = "RETRY" - self.update_state(state=status, meta={'annotation_queue_id':annotation_queue_id, + elif status == "retry": + self.update_state(state="RETRY", meta={ 'exc_type': "Runtime error", - 'exc_message': "The annotation service yielded " + runtime_error + "! Will attempt retry.", + 'exc_message': message, 'custom': '...'}) - annotate_variant.retry() - self.update_state(state=status, meta={'annotation_queue_id':annotation_queue_id}) + heredicare_variant_import.retry() + else: + self.update_state(state="SUCCESS") + + +def import_variants(conn: Connection, user_id, user_roles, min_date, import_queue_id): # the task worker + status = "success" + + vids, status, message = heredicare_interface.get_vid_list(min_date) + + if status == "success": + # spawn one task for each variant import + print(len(vids)) + vids = vids[:5] + for vid in vids: + _ = start_import_one_variant(vid, import_queue_id, user_id, user_roles, conn) + + return status, message + + + + + + + + + + +################################################ +############## IMPORT THE VARIANT ############## +################################################ + +def start_import_one_variant(vid, import_queue_id, user_id, user_roles, conn: Connection): # starts the celery task + import_variant_queue_id = conn.insert_variant_import_request(vid, import_queue_id) + + task = import_one_variant_heredicare.apply_async(args=[vid, user_id, user_roles, import_variant_queue_id]) + task_id = task.id + + conn.update_import_variant_queue_celery_id(import_variant_queue_id, celery_task_id = task_id) + + return task_id # this uses exponential backoff in case there is a http error # this will retry 3 times before giving up # first retry after 5 seconds, second after 25 seconds, third after 125 seconds (if task queue is empty that is) @celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600) -def import_one_variant_heredicare(self, vid, user_id, user_roles): +def import_one_variant_heredicare(self, vid, user_id, user_roles, import_variant_queue_id): """Background task for fetching variants from HerediCare""" - from frontend_celery.webapp.utils.variant_importer import fetch_heredicare + #from frontend_celery.webapp.utils.variant_importer import fetch_heredicare self.update_state(state='PROGRESS') + conn = Connection(user_roles) + + conn.update_import_variant_queue_status(import_variant_queue_id, status = "progress", message = "") status, message = fetch_heredicare(vid, heredicare_interface, user_id, conn) - conn.close() print(status) - print(message) + + if status != "retry": + conn.close_import_variant_request(import_variant_queue_id, status = status, message = message) + else: + conn.update_import_variant_queue_status(import_variant_queue_id, status = status, message = message) + + conn.close() + if status == 'error': - status = 'FAILURE' - self.update_state(state=status, meta={ + self.update_state(state='FAILURE', meta={ 'exc_type': "Runtime error", 'exc_message': message, 'custom': '...' }) raise Ignore() - if status == "retry": - status = "RETRY" - self.update_state(state=status, meta={ + elif status == "retry": + self.update_state(state="RETRY", meta={ 'exc_type': "Runtime error", 'exc_message': message, 'custom': '...'}) import_one_variant_heredicare.retry() - self.update_state(state=status) + else: + self.update_state(state="SUCCESS") -@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600) -def heredicare_variant_import(self, user_id, user_roles): - """Background task for fetching variants from HerediCare""" - self.update_state(state='PROGRESS') +def fetch_heredicare(vid, heredicare_interface, user_id, conn:Connection): # the task worker + + variant, status, message = heredicare_interface.get_variant(vid) - conn = Connection(user_roles) + if status != 'success': + return status, message - import_status = "progress" - import_request = conn.insert_import_request(user_id) - print(import_request.finished_at) + genome_build = "GRCh38" - vids, status, message = heredicare_interface.get_vid_list(import_request.finished_at) - if status != "success": - import_status = status + # first check if the hg38 information is there + chrom = variant.get('CHROM') + pos = variant.get('POS_HG38') + ref = variant.get('REF_HG38') + alt = variant.get('ALT_HG38') + + # if there is missing information check if there is hg19 information + if any([x is None for x in [chrom, pos, ref, alt]]): + pos = variant.get('POS_HG19') + ref = variant.get('REF_HG19') + alt = variant.get('ALT_HG19') + genome_build = "GRCh37" - conn.update_import_queue_status(import_request.id, import_status, message) + # if there is still missing data check if the variant has hgvs_c information + if any([x is None for x in [chrom, pos, ref, alt]]): + transcript = variant.get('REFSEQ') + hgvs_c = variant.get('CHGVS') - if status != "success": - return None + #TODO: maybe check if you can get some transcripts from the gene??? gene = variant.get('GEN') + if any([x is None for x in [transcript, hgvs_c]]): + status = "error" + message = "Not enough data to convert variant!" + return status, message - # spawn one task for each variant import - print(len(vids)) - vids = vids[:10] - for vid in vids: - task = import_one_variant_heredicare.apply_async(args=[vid, user_id, user_roles]) + chrom, pos, ref, alt, err_msg = functions.hgvsc_to_vcf(hgvs_c, transcript) # convert to vcf - conn.close() + if err_msg != "": # catch runtime errors of hgvs to vcf + status = "error" + message = "HGVS to VCF yieled an error: " + str(err_msg) + return status, message + + # the conversion was not successful + if any([x is None for x in [chrom, pos, ref, alt]]): + status = "error" + message = "HGVS could not be converted to VCF: " + str(transcript) + ":" + str(hgvs_c) + return status, message - #status, message = fetch_heredicare(vid, heredicare_interface) - print(status) - print(message) + + existing_variant_id = conn.get_variant_id_from_external_id(vid, "heredicare") + if existing_variant_id is not None: # vid is already in heredivar + #check that new variant and imported variant are equal + existing_variant = conn.get_variant(existing_variant_id) + #if existing_variant.chrom == + + + was_successful, message, variant_id = validate_and_insert_variant(chrom, pos, ref, alt, genome_build, conn, user_id, allowed_sequence_letters = "ACGT") + if variant_id is not None: + conn.insert_external_variant_id(variant_id, vid, "heredicare") + + if not was_successful: + status = "error" + + return status, message + + + + +def validate_and_insert_variant(chrom, pos, ref, alt, genome_build, conn: Connection, user_id, allowed_sequence_letters = "ACGT", perform_annotation = True): + message = "" + was_successful = True + variant_id = None + # validate request + + chrom, chrom_is_valid = functions.curate_chromosome(chrom) + ref, ref_is_valid = functions.curate_sequence(ref, allowed_sequence_letters) + alt, alt_is_valid = functions.curate_sequence(alt, allowed_sequence_letters) + pos, pos_is_valid = functions.curate_position(pos) + + if not chrom_is_valid: + message = "Chromosome is invalid: " + str(chrom) + elif not ref_is_valid: + message = "Reference base is invalid: " + str(ref) + elif not alt_is_valid: + message = "Alternative base is invalid: " + str(alt) + elif not pos_is_valid: + message = "Position is invalid: " + str(pos) + if not chrom_is_valid or not ref_is_valid or not alt_is_valid or not pos_is_valid: + was_successful = False + return was_successful, message, variant_id + + + + tmp_file_path = functions.get_random_temp_file("vcf") + functions.variant_to_vcf(chrom, pos, ref, alt, tmp_file_path) + + do_liftover = genome_build == 'GRCh37' + returncode, err_msg, command_output, vcf_errors_pre, vcf_errors_post = functions.preprocess_variant(tmp_file_path, do_liftover = do_liftover) + + + if returncode != 0: + message = err_msg + was_successful = False + functions.rm(tmp_file_path) + return was_successful, message, variant_id + if 'ERROR:' in vcf_errors_pre: + message = vcf_errors_pre.replace('\n', ' ') + was_successful = False + functions.rm(tmp_file_path) + return was_successful, message, variant_id + if genome_build == 'GRCh37': + unmapped_variants_vcf = open(tmp_file_path + '.lifted.unmap', 'r') + unmapped_variant = None + for line in unmapped_variants_vcf: + if line.startswith('#') or line.strip() == '': + continue + unmapped_variant = line + break + unmapped_variants_vcf.close() + if unmapped_variant is not None: + message = 'ERROR: could not lift variant ' + unmapped_variant + was_successful = False + functions.rm(tmp_file_path) + functions.rm(tmp_file_path + ".lifted.unmap") + return was_successful, message, variant_id + if 'ERROR:' in vcf_errors_post: + message = vcf_errors_post.replace('\n', ' ') + was_successful = False + functions.rm(tmp_file_path) + functions.rm(tmp_file_path + ".lifted.unmap") + return was_successful, message, variant_id + + if was_successful: + tmp_file = open(tmp_file_path, 'r') + for line in tmp_file: + line = line.strip() + if line.startswith('#') or line == '': + continue + parts = line.split('\t') + new_chr = parts[0] + new_pos = parts[1] + new_ref = parts[3] + new_alt = parts[4] + break # there is only one variant in the file + tmp_file.close() + + is_duplicate = conn.check_variant_duplicate(new_chr, new_pos, new_ref, new_alt) # check if variant is already contained + + if not is_duplicate: + # insert it & capture the annotation_queue_id of the newly inserted variant to start the annotation service in celery + variant_id = conn.insert_variant(new_chr, new_pos, new_ref, new_alt, chrom, pos, ref, alt, user_id) + if perform_annotation: + celery_task_id = start_annotation_service(conn, user_id) # starts the celery background task + else: + variant_id = conn.get_variant_id(new_chr, new_pos, new_ref, new_alt) + message = "Variant not imported: already in database!!" + was_successful = False + + functions.rm(tmp_file_path) + functions.rm(tmp_file_path + ".lifted.unmap") + return was_successful, message, variant_id + + + + + + +########################################################## +############## START THE ANNOTATION SERVICE ############## +########################################################## + +def start_annotation_service(conn: Connection, user_id, variant_id = None, annotation_queue_id = None, job_config = get_default_job_config()): # start the celery task + if variant_id is not None: + annotation_queue_id = conn.insert_annotation_request(variant_id, user_id) # only inserts a new row if there is none with this variant_id & pending + log_postfix = " for variant " + str(variant_id) + else: + log_postfix = " for annotation queue entry " + str(annotation_queue_id) + task = annotate_variant.apply_async(args=[annotation_queue_id, job_config]) + print("Issued annotation for annotation queue id: " + str(annotation_queue_id) + " with celery task id: " + str(task.id) + log_postfix) + #current_app.logger.info(session['user']['preferred_username'] + " started the annotation service for annotation queue id: " + str(annotation_queue_id) + " with celery task id: " + str(task.id) + log_postfix) + conn.insert_celery_task_id(annotation_queue_id, task.id) + return task.id + +# the worker is the annotation service itself! + + +# this uses exponential backoff in case there is a http error +# this will retry 3 times before giving up +# first retry after 5 seconds, second after 25 seconds, third after 125 seconds (if task queue is empty that is) +@celery.task(bind=True, retry_backoff=5, max_retries=3, time_limit=600) +def annotate_variant(self, annotation_queue_id, job_config): + """Background task for running the annotation service""" + self.update_state(state='PROGRESS', meta={'annotation_queue_id':annotation_queue_id}) + status, runtime_error = process_one_request(annotation_queue_id, job_config=job_config) + celery_status = 'success' if status == 'error': - status = 'FAILURE' - self.update_state(state=status, meta={ + celery_status = 'FAILURE' + self.update_state(state=celery_status, meta={'annotation_queue_id':annotation_queue_id, 'exc_type': "Runtime error", - 'exc_message': message, + 'exc_message': "The annotation service yielded a runtime error: " + runtime_error, 'custom': '...' }) raise Ignore() if status == "retry": - status = "RETRY" - self.update_state(state=status, meta={ + celery_status = "RETRY" + self.update_state(state=celery_status, meta={'annotation_queue_id':annotation_queue_id, 'exc_type': "Runtime error", - 'exc_message': message, + 'exc_message': "The annotation service yielded " + runtime_error + "! Will attempt retry.", 'custom': '...'}) - heredicare_variant_import.retry() - self.update_state(state=status) + annotate_variant.retry() + self.update_state(state=celery_status, meta={'annotation_queue_id':annotation_queue_id}) + + + + + + diff --git a/src/frontend_celery/webapp/templates/user/variant_import_summary.html b/src/frontend_celery/webapp/templates/user/variant_import_summary.html new file mode 100644 index 00000000..942c8c0f --- /dev/null +++ b/src/frontend_celery/webapp/templates/user/variant_import_summary.html @@ -0,0 +1,130 @@ +{% extends 'base.html' %} + + +{% block content %} + +
+ + +

{% block title %} Variant import summary {% endblock %}

+ + + +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Requesting user
Requested at
Status
Finished at
Message
Total number of variants
+
+
+ + + + +
+
+ + + + + + + + + + + + + + + + + + + +
Pending variantsProcessing variantsErroneous variantsNew variantsDeleted variantsRetrying variants
+
+
+ + + + + + +
+
+
+

Erroneous variants

+ These are variants that are missing information or had some other issues. The message column contains information about the reason why they were not imported. + +

+

+ + + + + + + + + + + + + +
HerediCare VID
Status
Requested at
Finished at
Message
+
+

+
+
+
+ + + +
+ + + + + +{% endblock %} + +{% block special_scripts %} + +{% endblock %} \ No newline at end of file diff --git a/src/frontend_celery/webapp/user/user_routes.py b/src/frontend_celery/webapp/user/user_routes.py index 84fb9bb5..39306d78 100644 --- a/src/frontend_celery/webapp/user/user_routes.py +++ b/src/frontend_celery/webapp/user/user_routes.py @@ -1,4 +1,4 @@ -from flask import render_template, request, url_for, flash, redirect, Blueprint, current_app, session +from flask import render_template, request, url_for, flash, redirect, Blueprint, current_app, session, jsonify from os import path import sys sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))) @@ -10,8 +10,7 @@ from ..utils import * from flask_paginate import Pagination import annotation_service.main as annotation_service -from annotation_service.heredicare_interface import heredicare_interface -from ..tasks import import_one_variant_heredicare, heredicare_variant_import +from ..tasks import start_annotation_service, start_variant_import user_blueprint = Blueprint( @@ -346,8 +345,11 @@ def admin_dashboard(): if request_type == 'import_variants': # mass import from heredicare #heredicare_interface = current_app.extensions['heredicare_interface'] #start_variant_import(conn) - heredicare_variant_import.apply_async(args=[session['user']['user_id'], session['user']['roles']]) - do_redirect = True + #heredicare_variant_import.apply_async(args=[session['user']['user_id'], session['user']['roles']]) + import_queue_id = start_variant_import(session['user']['user_id'], session['user']['roles'], conn = conn) + #task = import_one_variant_heredicare.apply_async(args=[12, 30, ["super_user"], 169]) + + return redirect(url_for('user.variant_import_summary', import_queue_id = import_queue_id)) if do_redirect: @@ -355,28 +357,23 @@ def admin_dashboard(): return render_template('user/admin_dashboard.html', most_recent_import_request=most_recent_import_request, job_config = job_config, annotation_stati = annotation_stati, errors = errors, warnings = warnings, total_num_variants = total_num_variants) -def start_variant_import(conn): - import_status = "progress" - import_request = conn.insert_import_request(session['user']['user_id']) - - print(import_request.finished_at) - - vids, status, message = heredicare_interface.get_vid_list(import_request.finished_at) - - if status != "success": - import_status = status - - conn.update_import_queue_status(import_request.id, import_status, message) - if status != "success": - return None +@user_blueprint.route('/variant_import_summary/', methods=('GET', 'POST')) +@require_permission(['admin_resources']) +def variant_import_summary(import_queue_id): + conn = get_connection() + import_request = conn.get_import_request(import_queue_id) + if import_request is None: + abort(404) + return render_template('user/variant_import_summary.html', import_queue_id = import_queue_id) - # spawn one task for each variant import - print(len(vids)) - vids = vids[:5] - for vid in vids: - task = import_one_variant_heredicare.apply_async(args=[vid]) - return status, message +@user_blueprint.route('/variant_import_summary_data/', methods=('GET', 'POST')) +@require_permission(['admin_resources']) +def variant_import_summary_data(import_queue_id): + conn = get_connection() + import_request = conn.get_import_request(import_queue_id) + imported_variants = conn.get_imported_variants(import_queue_id) + return jsonify({'import_request': import_request, 'imported_variants': imported_variants}) \ No newline at end of file diff --git a/src/frontend_celery/webapp/utils/__init__.py b/src/frontend_celery/webapp/utils/__init__.py index c6a666ba..8bcdf893 100644 --- a/src/frontend_celery/webapp/utils/__init__.py +++ b/src/frontend_celery/webapp/utils/__init__.py @@ -10,7 +10,6 @@ from .decorators import * from .search_utils import * from .clinvar_utils import * -from .variant_importer import * from urllib.parse import urlparse, urljoin import pathlib diff --git a/src/frontend_celery/webapp/utils/variant_importer.py b/src/frontend_celery/webapp/utils/variant_importer.py deleted file mode 100644 index f4f6c3ca..00000000 --- a/src/frontend_celery/webapp/utils/variant_importer.py +++ /dev/null @@ -1,197 +0,0 @@ -from os import path -import sys -sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))) -from common.db_IO import Connection -import common.functions as functions -from ..tasks import annotate_variant -import annotation_service.main as annotation_service - - -def start_annotation_service(conn: Connection, user_id, variant_id = None, annotation_queue_id = None, job_config = annotation_service.get_default_job_config()): - if variant_id is not None: - annotation_queue_id = conn.insert_annotation_request(variant_id, user_id) # only inserts a new row if there is none with this variant_id & pending - log_postfix = " for variant " + str(variant_id) - else: - log_postfix = " for annotation queue entry " + str(annotation_queue_id) - task = annotate_variant.apply_async(args=[annotation_queue_id, job_config]) - print("Issued annotation for annotation queue id: " + str(annotation_queue_id) + " with celery task id: " + str(task.id) + log_postfix) - #current_app.logger.info(session['user']['preferred_username'] + " started the annotation service for annotation queue id: " + str(annotation_queue_id) + " with celery task id: " + str(task.id) + log_postfix) - conn.insert_celery_task_id(annotation_queue_id, task.id) - return task.id - - -def validate_and_insert_variant(chrom, pos, ref, alt, genome_build, conn: Connection, user_id, allowed_sequence_letters = "ACGT", perform_annotation = True): - message = "" - was_successful = True - new_variant = {} - # validate request - - chrom, chrom_is_valid = functions.curate_chromosome(chrom) - ref, ref_is_valid = functions.curate_sequence(ref, allowed_sequence_letters) - alt, alt_is_valid = functions.curate_sequence(alt, allowed_sequence_letters) - pos, pos_is_valid = functions.curate_position(pos) - - if not chrom_is_valid: - message = "Chromosome is invalid: " + str(chrom) - elif not ref_is_valid: - message = "Reference base is invalid: " + str(ref) - elif not alt_is_valid: - message = "Alternative base is invalid: " + str(alt) - elif not pos_is_valid: - message = "Position is invalid: " + str(pos) - if not chrom_is_valid or not ref_is_valid or not alt_is_valid or not pos_is_valid: - was_successful = False - return was_successful, message, new_variant - - - - tmp_file_path = functions.get_random_temp_file("vcf") - functions.variant_to_vcf(chrom, pos, ref, alt, tmp_file_path) - - do_liftover = genome_build == 'GRCh37' - returncode, err_msg, command_output, vcf_errors_pre, vcf_errors_post = functions.preprocess_variant(tmp_file_path, do_liftover = do_liftover) - - - if returncode != 0: - message = err_msg - was_successful = False - functions.rm(tmp_file_path) - return was_successful, message, new_variant - if 'ERROR:' in vcf_errors_pre: - message = vcf_errors_pre.replace('\n', ' ') - was_successful = False - functions.rm(tmp_file_path) - return was_successful, message, new_variant - if genome_build == 'GRCh37': - unmapped_variants_vcf = open(tmp_file_path + '.lifted.unmap', 'r') - unmapped_variant = None - for line in unmapped_variants_vcf: - if line.startswith('#') or line.strip() == '': - continue - unmapped_variant = line - break - unmapped_variants_vcf.close() - if unmapped_variant is not None: - message = 'ERROR: could not lift variant ' + unmapped_variant - was_successful = False - functions.rm(tmp_file_path) - functions.rm(tmp_file_path + ".lifted.unmap") - return was_successful, message, new_variant - if 'ERROR:' in vcf_errors_post: - message = vcf_errors_post.replace('\n', ' ') - was_successful = False - functions.rm(tmp_file_path) - functions.rm(tmp_file_path + ".lifted.unmap") - return was_successful, message, new_variant - - if was_successful: - tmp_file = open(tmp_file_path, 'r') - for line in tmp_file: - line = line.strip() - if line.startswith('#') or line == '': - continue - parts = line.split('\t') - new_chr = parts[0] - new_pos = parts[1] - new_ref = parts[3] - new_alt = parts[4] - new_variant = {'chrom': new_chr, 'pos': new_pos, 'ref': new_ref, 'alt': new_alt} - break # there is only one variant in the file - tmp_file.close() - - is_duplicate = conn.check_variant_duplicate(new_chr, new_pos, new_ref, new_alt) # check if variant is already contained - - if not is_duplicate: - # insert it & capture the annotation_queue_id of the newly inserted variant to start the annotation service in celery - annotation_queue_id = conn.insert_variant(new_chr, new_pos, new_ref, new_alt, chrom, pos, ref, alt, user_id) - if perform_annotation: - celery_task_id = start_annotation_service(conn, user_id, annotation_queue_id = annotation_queue_id) # starts the celery background task - else: - message = "Variant not imported: already in database!!" - was_successful = False - - functions.rm(tmp_file_path) - functions.rm(tmp_file_path + ".lifted.unmap") - return was_successful, message, new_variant - - - - - - - - -def fetch_heredicare(vid, heredicare_interface, user_id, conn:Connection): - - variant, status, message = heredicare_interface.get_variant(vid) - - if status != 'success': - return status, message - - - genome_build = "GRCh38" - - - # first check if the hg38 information is there - chrom = variant.get('CHROM') - pos = variant.get('POS_HG38') - ref = variant.get('REF_HG38') - alt = variant.get('ALT_HG38') - - # if there is missing information check if there is hg19 information - if any([x is None for x in [chrom, pos, ref, alt]]): - pos = variant.get('POS_HG19') - ref = variant.get('REF_HG19') - alt = variant.get('ALT_HG19') - genome_build = "GRCh37" - - # if there is still missing data check if the variant has hgvs_c information - if any([x is None for x in [chrom, pos, ref, alt]]): - transcript = variant.get('REFSEQ') - hgvs_c = variant.get('CHGVS') - - #TODO: maybe check if you can get some transcripts from the gene??? gene = variant.get('GEN') - if any([x is None for x in [transcript, hgvs_c]]): - status = "error" - message = "Not enough data to convert variant!" - return status, message - - chrom, pos, ref, alt, err_msg = functions.hgvsc_to_vcf(hgvs_c, transcript) # convert to vcf - - if err_msg != "": # catch runtime errors of hgvs to vcf - status = "error" - message = "HGVS to VCF yieled an error: " + str(err_msg) - return status, message - - # the conversion was not successful - if any([x is None for x in [chrom, pos, ref, alt]]): - status = "error" - message = "HGVS could not be converted to VCF: " + str(transcript) + ":" + str(hgvs_c) - return status, message - - validate_and_insert_variant(chrom, pos, ref, alt, genome_build, conn, user_id, allowed_sequence_letters = "ACGT") - - return status, message - - - - -# 'VID': '19439917' --> variant id -# 'REV_STAT': '2' --> 2: kein review, 3: review erfolgt, 1: neu angelegt? -# 'QUELLE': '2' --> 1: manuell, 2: upload -# 'GEN': 'MSH2' --> das gen -# 'REFSEQ': 'NM_000251.3' --> Transkript -# 'ART': '-1' --> 1-6 sind klar, was bedeutet -1? -# 'CHROM': '2' --> chromosom -# 'POS_HG19': '47630514' --> hg19 position -# 'REF_HG19': 'G' --> hg19 reference base -# 'ALT_HG19': 'C' --> hg19 alternative base -# 'POS_HG38': '47403375' --> hg38 position -# 'REF_HG38': 'G' --> hg38 reference base -# 'ALT_HG38': 'C' --> hg38 alternative base -# 'KONS': '-1' --> consequence, value 1-10 -# 'KONS_VCF': 'missense_variant' --> consequence?? was ist der Unterschied zu KONS? -# 'CHGVS': 'c.184G>C' --> c.hgvs -# 'PHGVS': 'Gly62Arg' --> p.hgvs - - diff --git a/src/frontend_celery/webapp/variant/variant_routes.py b/src/frontend_celery/webapp/variant/variant_routes.py index 57b89936..e6422fd1 100644 --- a/src/frontend_celery/webapp/variant/variant_routes.py +++ b/src/frontend_celery/webapp/variant/variant_routes.py @@ -5,7 +5,7 @@ import sys from os import path from .variant_functions import * -from ..tasks import generate_consensus_only_vcf_task +from ..tasks import generate_consensus_only_vcf_task, start_annotation_service, validate_and_insert_variant sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))) import common.functions as functions @@ -160,7 +160,7 @@ def display(variant_id=None, chr=None, pos=None, ref=None, alt=None): current_annotation_status = conn.get_current_annotation_status(variant_id) if current_annotation_status is not None: if current_annotation_status[4] == 'pending' and current_annotation_status[7] is None: - celery_task_id = start_annotation_service(variant_id = variant_id, user_id = session['user']['user_id']) + celery_task_id = start_annotation_service(variant_id = variant_id, conn = conn, user_id = session['user']['user_id']) current_annotation_status = current_annotation_status[0:7] + (celery_task_id, ) variant = conn.get_variant(variant_id)