diff --git a/src/common/db_IO.py b/src/common/db_IO.py
index 3a2608a9..08eb0c2b 100644
--- a/src/common/db_IO.py
+++ b/src/common/db_IO.py
@@ -1242,7 +1242,7 @@ def get_mane_select_for_gene(self, gene, source):
"""
def get_mane_select_for_gene(self, gene_id):
- command = "SELECT name FROM transcript WHERE gene_id = %s AND (is_mane_select=1 or is_mane_plus_clinical=1)"
+ command = "SELECT name FROM transcript WHERE gene_id = %s AND is_mane_select=1"
self.cursor.execute(command, (gene_id, ))
result = self.cursor.fetchall()
return [x[0] for x in result if x[0].startswith("ENST")]
@@ -2088,12 +2088,12 @@ def get_heredicare_center_classifications(self, heredicare_annotation_id):
#result = sorted(result, key=lambda x: functions.convert_none_infinite(x[5]), reverse=True)
return result
- def insert_user(self, username, first_name, last_name, affiliation):
+ def insert_user(self, username, first_name, last_name, affiliation, api_roles):
#command = "INSERT INTO user (username, first_name, last_name, affiliation) \
# SELECT %s FROM DUAL WHERE NOT EXISTS (SELECT * FROM user \
# WHERE `username`=%s LIMIT 1)"
- command = "INSERT INTO user (username, first_name, last_name, affiliation) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE first_name=%s, last_name=%s, affiliation=%s"
- self.cursor.execute(command, (username, first_name, last_name, affiliation, first_name, last_name, affiliation))
+ command = "INSERT INTO user (username, first_name, last_name, affiliation, api_roles) VALUES (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE first_name=%s, last_name=%s, affiliation=%s, api_roles=%s"
+ self.cursor.execute(command, (username, first_name, last_name, affiliation, api_roles, first_name, last_name, affiliation, api_roles))
self.conn.commit()
def get_user(self, user_id):
@@ -2106,6 +2106,11 @@ def set_api_key(self, username, api_key):
command = "UPDATE user SET api_key = %s WHERE username = %s"
self.cursor.execute(command, (api_key, username))
self.conn.commit()
+
+ def set_api_roles(self, username, api_roles):
+ command = "UPDATE user SET api_roles = %s WHERE username = %s"
+ self.cursor.execute(command, (api_roles, username))
+ self.conn.commit()
def check_api_key(self, api_key: str, username: str) -> bool:
command = "SELECT EXISTS (SELECT * FROM user WHERE api_key = %s AND username = %s)"
@@ -2115,6 +2120,16 @@ def check_api_key(self, api_key: str, username: str) -> bool:
return True
return False
+ def check_api_roles(self, username: str, roles: list):
+ command = "SELECT api_roles FROM user WHERE username = %s"
+ self.cursor.execute(command, (username, ))
+ result = self.cursor.fetchone()[0]
+ db_roles = result.split(';')
+ for role in roles:
+ if role not in db_roles:
+ return False, db_roles
+ return True, db_roles
+
def parse_raw_user(self, raw_user):
return models.User(id = raw_user[0],
full_name = raw_user[2] + ' ' + raw_user[3],
diff --git a/src/frontend_celery/config.py b/src/frontend_celery/config.py
index 76f942cf..6398ba3a 100644
--- a/src/frontend_celery/config.py
+++ b/src/frontend_celery/config.py
@@ -87,6 +87,9 @@ class Config(object):
MAIL_USE_SSL = False
MAIL_DEBUG = False
+ # misc
+ VCF_FILE_IMPORT_ACTIVE = False
+
class ProdConfig(Config):
diff --git a/src/frontend_celery/webapp/api/api_routes.py b/src/frontend_celery/webapp/api/api_routes.py
index 8e853c1a..a975872a 100644
--- a/src/frontend_celery/webapp/api/api_routes.py
+++ b/src/frontend_celery/webapp/api/api_routes.py
@@ -11,6 +11,7 @@
from ..utils import *
from . import api_functions
from webapp import tasks
+from webapp.variant import variant_functions
api_blueprint = Blueprint(
'api',
@@ -19,10 +20,10 @@
@api_blueprint.route('/api/v1.0/get/consensus_classification', methods=['GET'])
-@accept_token
+@require_api_token_permission(["read_only"])
def consensus_classification():
- conn = Connection(roles = ["read_only"])
+ conn = get_connection() #Connection(roles = ["read_only"])
variant_id = request.args.get('variant_id')
if variant_id is None:
@@ -34,11 +35,9 @@ def consensus_classification():
variant_id = conn.get_variant_id(chrom, pos, ref, alt)
if variant_id is None:
- conn.close()
abort(404, "Requested variant does not exist or missing variant information")
variant = conn.get_variant(variant_id, include_annotations = False, include_consensus = True, include_user_classifications = False, include_heredicare_classifications=False, include_automatic_classification=False, include_clinvar=False, include_consequences=False, include_assays=False, include_literature=False, include_external_ids=False)
- conn.close()
v_res = api_functions.prepare_variant(variant)
mrcc = variant.get_recent_consensus_classification()
@@ -52,10 +51,10 @@ def consensus_classification():
return jsonify(result)
-# DEPRECATED - DELETE - use /api/v1.0/check_variant
-@api_blueprint.route('/api/v1.0/get/check_hgvs', methods = ['GET'])
-@accept_token
-def check_hgvs():
+
+@api_blueprint.route('/api/v1.0/check_variant', methods = ['GET'])
+@require_api_token_permission(["read_only"])
+def check():
variant = {
'VID': None,
'CHROM': None,
@@ -76,40 +75,6 @@ def check_hgvs():
'canon_alt': '',
'comment': ''
}
- conn = Connection(roles = ["read_only"])
- status, message = tasks.map_hg38(variant, -1, conn, insert_variant = False, perform_annotation = False, external_ids = None)
- conn.close()
-
- result = {
- "status": status,
- "message": message
- }
- return jsonify(result)
-
-# DEPRECATED - DELETE- use /api/v1.0/check_variant
-@api_blueprint.route('/api/v1.0/get/check_genomic', methods = ['GET'])
-@accept_token
-def check_genomic():
- variant = {
- 'VID': None,
- 'CHROM': None,
- 'POS_HG19': None,
- 'REF_HG19': None,
- 'ALT_HG19': None,
- 'POS_HG38': None,
- 'REF_HG38': None,
- 'ALT_HG38': None,
- 'REFSEQ': None,
- 'CHGVS': None,
- 'CGCHBOC': None,
- 'VISIBLE': 1,
- 'GEN': None,
- 'canon_chrom': '',
- 'canon_pos': '',
- 'canon_ref': '',
- 'canon_alt': '',
- 'comment': ''
- }
genome_build = request.args.get('genome')
if genome_build == "GRCh38":
@@ -123,9 +88,8 @@ def check_genomic():
variant["REF_HG19"] = request.args.get('ref')
variant["ALT_HG19"] = request.args.get('alt')
- conn = Connection(roles = ["read_only"])
+ conn = get_connection() # Connection(roles = ["read_only"])
status, message = tasks.map_hg38(variant, -1, conn, insert_variant = False, perform_annotation = False, external_ids = None)
- conn.close()
result = {
"status": status,
@@ -135,48 +99,16 @@ def check_genomic():
-@api_blueprint.route('/api/v1.0/check_variant', methods = ['GET'])
-@accept_token
-def check():
- variant = {
- 'VID': None,
- 'CHROM': None,
- 'POS_HG19': None,
- 'REF_HG19': None,
- 'ALT_HG19': None,
- 'POS_HG38': None,
- 'REF_HG38': None,
- 'ALT_HG38': None,
- 'REFSEQ': request.args.get('transcript'),
- 'CHGVS': request.args.get('hgvsc'),
- 'CGCHBOC': None,
- 'VISIBLE': 1,
- 'GEN': request.args.get("gene"),
- 'canon_chrom': '',
- 'canon_pos': '',
- 'canon_ref': '',
- 'canon_alt': '',
- 'comment': ''
- }
- genome_build = request.args.get('genome')
- if genome_build == "GRCh38":
- variant["CHROM"] = request.args.get('chrom')
- variant["POS_HG38"] = request.args.get('pos')
- variant["REF_HG38"] = request.args.get('ref')
- variant["ALT_HG38"] = request.args.get('alt')
- elif genome_build == "GRCh37":
- variant["CHROM"] = request.args.get('chrom')
- variant["POS_HG19"] = request.args.get('pos')
- variant["REF_HG19"] = request.args.get('ref')
- variant["ALT_HG19"] = request.args.get('alt')
+@api_blueprint.route('/api/v1.0/post/variant', methods = ['POST'])
+@require_api_token_permission(["user"])
+def insert_variant():
+ conn = get_connection()
+
+ user = conn.parse_raw_user(conn.get_user(session["user"]["user_id"]))
+ create_result, do_redirect = variant_functions.create_variant_from_request(request, user, conn)
+
+ return jsonify(create_result)
+
- conn = Connection(roles = ["read_only"])
- status, message = tasks.map_hg38(variant, -1, conn, insert_variant = False, perform_annotation = False, external_ids = None)
- conn.close()
- result = {
- "status": status,
- "message": message
- }
- return jsonify(result)
diff --git a/src/frontend_celery/webapp/auth/auth_routes.py b/src/frontend_celery/webapp/auth/auth_routes.py
index 7005c818..06889249 100644
--- a/src/frontend_celery/webapp/auth/auth_routes.py
+++ b/src/frontend_celery/webapp/auth/auth_routes.py
@@ -55,6 +55,8 @@ def auth():
first_name = user_info['given_name']
last_name = user_info['family_name']
affiliation = user_info.get('affiliation')
+ roles = user_info['roles']
+
# init the session
session['user'] = user_info
@@ -66,8 +68,9 @@ def auth():
return redirect(url_for('auth.logout', auto_logout='True'))
conn = Connection(session['user']['roles'])
# this inserts only if the user is not already in the database and updates the information if the information changed (except for username this one has to stay)
- conn.insert_user(username, first_name, last_name, affiliation)
- user_info['user_id'] = conn.get_user_id(username)
+ conn.insert_user(username, first_name, last_name, affiliation, ';'.join(roles))
+ user_id = conn.get_user_id(username)
+ user_info['user_id'] = user_id
conn.close()
current_app.logger.info("User " + user_info['preferred_username'] + ' (' + user_info.get('affiliation') + ") successfully logged in.")
@@ -176,10 +179,7 @@ def edit_user(username):
flash('Please provide at least one role!', 'alert-danger')
return render_template('auth/edit_user.html', user = user, avail_roles = avail_roles, set_roles = set_roles)
- if enabled == 'on':
- enabled = True
- else:
- enabled = False
+ enabled = enabled == 'on'
resp = auth_functions.update_user_information(kc_user_id, user['username'], first_name, last_name, e_mail, affiliation, enabled)
@@ -187,6 +187,15 @@ def edit_user(username):
resp = auth_functions.grant_roles(kc_user_id, added_roles, avail_roles)
resp = auth_functions.delete_roles(kc_user_id, deleted_roles, avail_roles)
+ avail_roles = auth_functions.get_roles()
+ new_roles = auth_functions.get_roles_of_user(kc_user_id, avail_roles)
+ final_new_roles = []
+ if enabled:
+ for new_role_index in new_roles:
+ final_new_roles.append(avail_roles[new_role_index]['name'])
+ conn = get_connection()
+ conn.set_api_roles(username, ";".join(final_new_roles))
+
if resp.status_code == 204:
flash('Successfully changed user information!', 'alert-success')
return redirect(url_for('auth.edit_user', username = username))
diff --git a/src/frontend_celery/webapp/tasks.py b/src/frontend_celery/webapp/tasks.py
index 5c9af5ea..d43d2d28 100644
--- a/src/frontend_celery/webapp/tasks.py
+++ b/src/frontend_celery/webapp/tasks.py
@@ -457,8 +457,8 @@ def map_hg38(variant, user_id, conn:Connection, insert_variant = True, perform_a
if not was_successful and hgvs_c_valid and gene_valid:
gene_id = conn.get_gene_id_by_symbol(gene_symbol)
- transcripts = conn.get_gencode_basic_transcripts(gene_id)
- #transcripts = conn.get_mane_select_for_gene(gene_id)
+ #transcripts = conn.get_gencode_basic_transcripts(gene_id)
+ transcripts = conn.get_mane_select_for_gene(gene_id)
if transcripts is not None:
#print(transcripts)
diff --git a/src/frontend_celery/webapp/templates/index.html b/src/frontend_celery/webapp/templates/index.html
index a089a405..428f9d4e 100644
--- a/src/frontend_celery/webapp/templates/index.html
+++ b/src/frontend_celery/webapp/templates/index.html
@@ -93,6 +93,20 @@
Overview
Changelog
+
v 1.13.6 (16.08.2024)
+
+ General changes:
+
+ - Improved API security
+ - Added variant upload and variant check through api
+
+ Bugfixes:
+
+ - Fixed a visual bug when the request to start a new upload to HerediCaRe task was erroneous, but the ClinVar upload was successful
+
+
+
+
v 1.13.5 (15.08.2024)
Bugfixes:
diff --git a/src/frontend_celery/webapp/templates/main/documentation.html b/src/frontend_celery/webapp/templates/main/documentation.html
index d4543a26..ecfee8f3 100644
--- a/src/frontend_celery/webapp/templates/main/documentation.html
+++ b/src/frontend_celery/webapp/templates/main/documentation.html
@@ -568,9 +568,112 @@
Download consensus classification
+
Upload variants
-
+
+
+
+ POST variant
+
+
+
+
+
+ Endpoint |
+ /api/v1.0/post/variant |
+
+
+ Description |
+ Upload a variant in VCF or HGVSc. notation |
+
+
+ Allowed methods |
+ POST |
+
+
+ Headers |
+ Required "Authorization:" header with apikey and username attributes. |
+
+
+ GET-data |
+
+ type: The type of data that is going to be used. Must be one of {"vcf", "hgvs"}
+ |
+
+
+ POST-data |
+
+ One of the following sets is required:
+
+ -
+ genome: The reference genome. Must be one of {GRCh38, GRCh37}
+ chr: The chromosome. Must be one of {chr1, chr2, chr3, chr4, chr5, chr6, chr7, chr8, chr9, chr10, chr11, chr12, chr13, chr14, chr15, chr16, chr17, chr18, chr19, chr20, chr21, chr22, chrX, chrY, chrMT}
+ pos: The chromosomal position relative to the reference genome provided
+ ref: The genomic sequence at the provided position from the reference genome. Can not be longer than 1000 bp and must contain only ACGT letters.
+ alt: The mutated genomic sequence at the provided position. Can not be longer than 1000 bp and must contain only ACGT letters.
+
+ -
+ transcript: The reference transcript. Preferably Ensembl transcript identifiers. The version number is not required
+ hgvsc: The HGVSc. string
+
+
+ |
+
+
+ Response & http status codes |
+
+
+ - 403: authorization header is missing or was set incorrectly
+ - 200: successful request (get details of the submission from the response)
+
+ |
+
+
+ Examples |
+
+ Using the variant id:
+
+ wget --header "Authorization: apikey $apikey username $username" --post-data="chr=chr19&pos=50398920&ref=G&alt=C&genome=GRCh38" "{{url_for('api.insert_variant', type="vcf")}}"
+
+ Using chromosomal coordinates with respect to the GRCh38 reference genome
+
+ wget --header "Authorization: apikey $yourkey username $yourusername" {{url_for('api.consensus_classification', chrom=chr2, pos=214730440, ref="G", alt="A")}}
+
+ Example output variant already in database:
+
+ {
+ "flash_class": "alert-danger flash_id:variant_already_in_database",
+ "flash_link": "/display/1234",
+ "flash_message": "Variant not imported: chr19-50398920-G-C already in database!",
+ "status": "skipped"
+ }
+
+ Example output erroneous variant:
+
+ {
+ "flash_class": "alert-danger flash_id:variant_from_vcf_error",
+ "flash_link": "",
+ "flash_message": "VcfCheck runtime ERROR: VcfCheck 2024_02-84-g1b249e47\nVCF check failed - see 'out' file for details! Code: 1 ERROR: Reference base(s) not correct. Is 'A', should be 'G'! - in line 6:\nchr19\t5039892\t.\tA\tC\t.\t.\t.",
+ "status": "error"
+ }
+
+ Example output success
+
+ {
+ "flash_class": "alert-success flash_id:successful_variant_from_vcf",
+ "flash_link": "/display/1234",
+ "flash_message": "Successfully inserted variant: chr19-5039892-G-C",
+ "status": "success"
+ }
+
+ |
+
+
+
+
+
+
diff --git a/src/frontend_celery/webapp/utils/decorators.py b/src/frontend_celery/webapp/utils/decorators.py
index 7afab4b5..f02ea66f 100644
--- a/src/frontend_celery/webapp/utils/decorators.py
+++ b/src/frontend_celery/webapp/utils/decorators.py
@@ -15,23 +15,45 @@
import common.functions as functions
from common.db_IO import Connection
-# used for api endpoints to check the bearer token header
-# similar to require login, but for api endpoints
-def accept_token(f):
- @wraps(f)
- def decorated_function(*args, **kwargs):
-
- authorization_header = parse_authorization_header(request.headers.get('Authorization'))
+## used for api endpoints to check the bearer token header
+## similar to require login, but for api endpoints
+#def accept_api_token(f):
+# @wraps(f)
+# def decorated_function(*args, **kwargs):
+#
+# authorization_header = parse_authorization_header(request.headers.get('Authorization'))
+#
+# conn = Connection()
+# api_key_ok = conn.check_api_key(authorization_header['apikey'], authorization_header['username'])
+# conn.close()
+#
+# if not api_key_ok:
+# abort(403, "Invalid credentials")
+#
+# return f(*args, **kwargs)
+# return decorated_function
+
+def require_api_token_permission(roles):
+ def decorator(f):
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ authorization_header = parse_authorization_header(request.headers.get('Authorization'))
- conn = Connection()
- api_key_ok = conn.check_api_key(authorization_header['apikey'], authorization_header['username'])
- conn.close()
+ conn = Connection()
+ api_key_ok = conn.check_api_key(authorization_header['apikey'], authorization_header['username'])
+ roles_ok, api_roles = conn.check_api_roles(authorization_header['username'], roles)
+ user_id = conn.get_user_id(authorization_header['username'])
+ conn.close()
- if not api_key_ok:
- abort(403, "Invalid credentials")
+ if not api_key_ok:
+ abort(403, "Invalid credentials")
+ if not roles_ok:
+ abort(403, "Insufficient privileges")
- return f(*args, **kwargs)
- return decorated_function
+ session['user'] = {'roles': api_roles, 'user_id': user_id,'preferred_username': authorization_header['username']}
+ return f(*args, **kwargs)
+ return wrapper
+ return decorator
def parse_authorization_header(authorization_header: str):
authorization_header = authorization_header.split(' ')
@@ -56,7 +78,8 @@ def parse_authorization_header(authorization_header: str):
abort(403, "Incomplete authorization header. Missing keyword: " + kw)
return result
-
+
+
# a decorator which redirects to the login page if the user is not logged in.
diff --git a/src/frontend_celery/webapp/variant/variant_functions.py b/src/frontend_celery/webapp/variant/variant_functions.py
index 5cb30bc6..a35a6f95 100644
--- a/src/frontend_celery/webapp/variant/variant_functions.py
+++ b/src/frontend_celery/webapp/variant/variant_functions.py
@@ -2,6 +2,7 @@
from ..download.download_routes import calculate_class
from flask import render_template
import io
+from webapp import tasks
@@ -347,17 +348,10 @@ def summarize_heredicare_status(heredicare_queue_entries, publish_queue):
def summarize_clinvar_status(clinvar_queue_entries, publish_queue):
summary = {"status": "unknown", "insert_tasks_message": ""}
if publish_queue is not None:
- if publish_queue.status == 'error':
- summary["status"] = "error"
- summary["insert_tasks_message"] = publish_queue.insert_tasks_message
- if publish_queue.insert_tasks_status == 'pending':
- summary["status"] = "waiting"
- elif publish_queue.insert_tasks_status == 'progress':
- summary["status"] = "requesting"
- elif clinvar_queue_entries is not None:
+ if clinvar_queue_entries is not None:
all_skipped = True
- for heredicare_queue_entry in clinvar_queue_entries:
- current_status = heredicare_queue_entry[3]
+ for clinvar_queue_entry in clinvar_queue_entries:
+ current_status = clinvar_queue_entry[3]
if current_status == 'skipped':
continue
all_skipped = False
@@ -367,4 +361,147 @@ def summarize_clinvar_status(clinvar_queue_entries, publish_queue):
summary["status"] = "multiple stati"
if all_skipped:
summary["status"] = "skipped"
- return summary
\ No newline at end of file
+ else:
+ if publish_queue.status == 'error':
+ summary["status"] = "error"
+ summary["insert_tasks_message"] = publish_queue.insert_tasks_message
+ if publish_queue.insert_tasks_status == 'pending':
+ summary["status"] = "waiting"
+ elif publish_queue.insert_tasks_status == 'progress':
+ summary["status"] = "requesting"
+
+ return summary
+
+
+
+
+#user = self.parse_raw_user(conn.get_user(user_id))
+# session["user"]["user_id"]
+def create_variant_from_request(request_obj, user, conn):
+ result = {
+ "flash_message": "",
+ "flash_class": "",
+ "flash_link": "",
+ "status": ""
+ }
+ do_redirect = False
+
+ create_variant_from = request_obj.args.get("type")
+
+ if not create_variant_from:
+ result["flash_message"] = "Missing type of variant information. Either vcf or hgvs."
+ result["flash_class"] = "alert-danger"
+ result["status"] = "error"
+
+ if create_variant_from == 'vcf':
+ chrom = request_obj.form.get('chr', '')
+ pos = ''.join(request_obj.form.get('pos', '').split())
+ ref = request_obj.form.get('ref', '').upper().strip()
+ alt = request_obj.form.get('alt', '').upper().strip()
+ genome_build = request_obj.form.get('genome')
+
+ # we do not have to check the input parameters in depth here because
+ # they are checked by the validate_and_insert_variant function anyway
+ # here we just make sure that the user submitted **something**
+ # -> better understanding/readability of the error message
+ if not chrom or not pos or not ref or not alt or 'genome' not in request_obj.form:
+ result["flash_message"] = 'All fields are required!'
+ result["flash_class"] = 'alert-danger flash_id:missing_data_vcf'
+ result["status"] = "error"
+ #flash('All fields are required!', 'alert-danger flash_id:missing_data_vcf')
+ else:
+ was_successful, message, variant_id = tasks.validate_and_insert_variant(chrom, pos, ref, alt, genome_build, conn = conn, user_id = user.id)
+ new_variant = conn.get_variant(variant_id, include_annotations=False, include_consensus = False, include_user_classifications = False, include_heredicare_classifications = False, include_automatic_classification=False, include_clinvar = False, include_consequences = False, include_assays = False, include_literature = False, include_external_ids=False)
+ if 'already in database' in message:
+ result["flash_message"] = "Variant not imported: " + new_variant.get_string_repr() + " already in database!"
+ result["flash_class"] = 'alert-danger flash_id:variant_already_in_database'
+ result["flash_link"] = url_for("variant.display", variant_id = new_variant.id)
+ result["status"] = "skipped"
+ #flash({"message": "Variant not imported: " + new_variant.get_string_repr() + " already in database! View the variant",
+ # "link": url_for("variant.display", variant_id = new_variant.id)}, "alert-danger flash_id:variant_already_in_database")
+ elif was_successful:
+ result["flash_message"] = "Successfully inserted variant: " + new_variant.get_string_repr()
+ result["flash_class"] = 'alert-success flash_id:successful_variant_from_vcf'
+ result["flash_link"] = url_for("variant.display", variant_id = new_variant.id)
+ result["status"] = "success"
+ #flash({"message": "Successfully inserted variant: " + new_variant.get_string_repr() + ". View your variant",
+ # "link": url_for("variant.display", variant_id = new_variant.id)}, "alert-success flash_id:successful_variant_from_vcf")
+ current_app.logger.info(str(user.id) + " successfully created a new variant from vcf which resulted in this vcf-style variant: " + ' '.join([str(new_variant.chrom), str(new_variant.pos), new_variant.ref, new_variant.alt, "GRCh38"]))
+ do_redirect = True
+ else: # import had an error
+ result["flash_message"] = message
+ result["flash_class"] = 'alert-danger flash_id:variant_from_vcf_error'
+ result["status"] = "error"
+ #flash(message, 'alert-danger flash_id:variant_from_vcf_error')
+
+ if create_variant_from == 'hgvsc':
+ reference_transcript = request_obj.form.get('transcript')
+ hgvsc = request_obj.form.get('hgvsc')
+
+ if not hgvsc or not reference_transcript:
+ result["flash_message"] = 'All fields are required!'
+ result["flash_class"] = 'alert-danger flash_id:missing_data_hgvs'
+ result["status"] = "error"
+ #flash('All fields are required!', 'alert-danger flash_id:missing_data_hgvs')
+ else:
+ chrom, pos, ref, alt, possible_errors = functions.hgvsc_to_vcf(reference_transcript + ':' + hgvsc)
+ if possible_errors != '':
+ flash_message = possible_errors
+ flash_class = 'alert-danger flash_id:variant_from_hgvs_error'
+ #flash(possible_errors, "alert-danger flash_id:variant_from_hgvs_error")
+ else:
+ was_successful, message, variant_id = tasks.validate_and_insert_variant(chrom, pos, ref, alt, 'GRCh38', conn = conn, user_id = session['user']['user_id'])
+ new_variant = conn.get_variant(variant_id, include_annotations=False, include_consensus = False, include_user_classifications = False, include_heredicare_classifications = False, include_clinvar = False, include_consequences = False, include_assays = False, include_literature = False)
+ if 'already in database' in message:
+ result["flash_message"] = "Variant not imported: " + new_variant.get_string_repr() + " already in database!"
+ result["flash_class"] = 'alert-danger flash_id:variant_already_in_database'
+ result["flash_link"] = url_for("variant.display", variant_id = new_variant.id)
+ result["status"] = "skipped"
+ #flash({"message": "Variant not imported: " + new_variant.get_string_repr() + " already in database! View your variant",
+ # "link": url_for("variant.display", variant_id = new_variant.id)}, "alert-danger flash_id:variant_already_in_database")
+ elif was_successful:
+ result["flash_message"] = "Successfully inserted variant: " + new_variant.get_string_repr()
+ result["flash_class"] = 'alert-success flash_id:successful_variant_from_hgvs'
+ result["flash_link"] = url_for("variant.display", variant_id = new_variant.id)
+ result["status"] = "success"
+ #flash({"message": "Successfully inserted variant: " + new_variant.get_string_repr() + ". View your variant",
+ # "link": url_for("variant.display", variant_id = new_variant.id)}, "alert-success flash_id:successful_variant_from_hgvs")
+ current_app.logger.info(str(user.id) + " successfully created a new variant from hgvs: " + hgvsc + "Which resulted in this vcf-style variant: " + ' '.join([str(new_variant.chrom), str(new_variant.pos), new_variant.ref, new_variant.alt, "GRCh38"]))
+ do_redirect = True
+ else:
+ result["flash_message"] = message
+ result["flash_class"] = "alert-danger flash_id:variant_from_hgvs_error"
+ result["status"] = "error"
+ #flash(message, 'alert-danger flash_id:variant_from_hgvs_error')
+
+ if create_variant_from == 'vcf_file' and current_app.config["vcf_file_import_active"]:
+ genome_build = request_obj.form.get('genome')
+ if 'file' not in request_obj.files or genome_build is None:
+ result["flash_message"] = 'You must specify the genome build and select a vcf file.'
+ result["flash_class"] = "alert-danger"
+ result["status"] = "error"
+ #flash('You must specify the genome build and select a vcf file.', 'alert-danger')
+ else:
+ file = request_obj.files['file']
+ filename = file.filename
+
+ if file.filename.strip() == '' or not functions.filename_allowed(file.filename, allowed_extensions = {"vcf", "txt"}):
+ result["flash_message"] = 'No valid file selected.'
+ result["flash_class"] = "alert-danger"
+ result["status"] = "error"
+ #flash('No valid file selected.', 'alert-danger')
+ else:
+ filepath = functions.get_random_temp_file(fileending = "tsv", filename_ext = "import_vcf")
+ with open(filepath, "w") as f: # file is deleted in task + we have to write to disk because filehandle can not be json serialized and thus, can not be given to a celery task
+ f.write(file.read().decode("utf-8"))
+ user_id = session["user"]["user_id"]
+ user_roles = session["user"]["roles"]
+ #inserted_variants, skipped_variants = variant_functions.insert_variants_vcf_file(vcf_file, genome_build, conn)
+ import_queue_id = tasks.start_variant_import_vcf(user_id, user_roles, conn, filename, filepath, genome_build)
+ result["flash_message"] = 'Successfully submitted vcf file. The import is processed in the background".'
+ result["flash_class"] = "alert-success"
+ result["status"] = "success"
+ #flash("Successfully submitted vcf file. The import is processed in the background", "alert-success")
+ do_redirect = True
+
+ return result, do_redirect
diff --git a/src/frontend_celery/webapp/variant/variant_routes.py b/src/frontend_celery/webapp/variant/variant_routes.py
index 3a9775f7..7f4eaa69 100644
--- a/src/frontend_celery/webapp/variant/variant_routes.py
+++ b/src/frontend_celery/webapp/variant/variant_routes.py
@@ -62,87 +62,21 @@ def search():
def create():
conn = get_connection()
chroms = conn.get_enumtypes("variant", "chr")
- vcf_file_import_active = False
do_redirect = False
if request.method == 'POST':
- create_variant_from = request.args.get("type")
-
- if create_variant_from == 'vcf':
- chrom = request.form.get('chr', '')
- pos = ''.join(request.form.get('pos', '').split())
- ref = request.form.get('ref', '').upper().strip()
- alt = request.form.get('alt', '').upper().strip()
- genome_build = request.form.get('genome')
-
- # we do not have to check the input parameters in depth here because
- # they are checked by the validate_and_insert_variant function anyway
- # here we just make sure that the user submitted **something**
- # -> better understanding/readability of the error message
- if not chrom or not pos or not ref or not alt or 'genome' not in request.form:
- flash('All fields are required!', 'alert-danger flash_id:missing_data_vcf')
- else:
- was_successful, message, variant_id = tasks.validate_and_insert_variant(chrom, pos, ref, alt, genome_build, conn = conn, user_id = session['user']['user_id'])
- new_variant = conn.get_variant(variant_id, include_annotations=False, include_consensus = False, include_user_classifications = False, include_heredicare_classifications = False, include_automatic_classification=False, include_clinvar = False, include_consequences = False, include_assays = False, include_literature = False, include_external_ids=False)
- if 'already in database' in message:
- flash({"message": "Variant not imported: " + new_variant.get_string_repr() + " already in database! View the variant",
- "link": url_for("variant.display", variant_id = new_variant.id)}, "alert-danger flash_id:variant_already_in_database")
- elif was_successful:
- flash({"message": "Successfully inserted variant: " + new_variant.get_string_repr() + ". View your variant",
- "link": url_for("variant.display", variant_id = new_variant.id)}, "alert-success flash_id:successful_variant_from_vcf")
- current_app.logger.info(session['user']['preferred_username'] + " successfully created a new variant from vcf which resulted in this vcf-style variant: " + ' '.join([str(new_variant.chrom), str(new_variant.pos), new_variant.ref, new_variant.alt, "GRCh38"]))
- do_redirect = True
- else: # import had an error
- flash(message, 'alert-danger flash_id:variant_from_vcf_error')
-
- if create_variant_from == 'hgvsc':
- reference_transcript = request.form.get('transcript')
- hgvsc = request.form.get('hgvsc')
-
- if not hgvsc or not reference_transcript:
- flash('All fields are required!', 'alert-danger flash_id:missing_data_hgvs')
- else:
- chrom, pos, ref, alt, possible_errors = functions.hgvsc_to_vcf(reference_transcript + ':' + hgvsc)
- if possible_errors != '':
- flash(possible_errors, "alert-danger flash_id:variant_from_hgvs_error")
- else:
- was_successful, message, variant_id = tasks.validate_and_insert_variant(chrom, pos, ref, alt, 'GRCh38', conn = conn, user_id = session['user']['user_id'])
- new_variant = conn.get_variant(variant_id, include_annotations=False, include_consensus = False, include_user_classifications = False, include_heredicare_classifications = False, include_clinvar = False, include_consequences = False, include_assays = False, include_literature = False)
- if 'already in database' in message:
- flash({"message": "Variant not imported: " + new_variant.get_string_repr() + " already in database! View your variant",
- "link": url_for("variant.display", variant_id = new_variant.id)}, "alert-danger flash_id:variant_already_in_database")
- elif was_successful:
- flash({"message": "Successfully inserted variant: " + new_variant.get_string_repr() + ". View your variant",
- "link": url_for("variant.display", variant_id = new_variant.id)}, "alert-success flash_id:successful_variant_from_hgvs")
- current_app.logger.info(session['user']['preferred_username'] + " successfully created a new variant from hgvs: " + hgvsc + "Which resulted in this vcf-style variant: " + ' '.join([str(new_variant.chrom), str(new_variant.pos), new_variant.ref, new_variant.alt, "GRCh38"]))
- do_redirect = True
- else:
- flash(message, 'alert-danger flash_id:variant_from_hgvs_error')
-
- if create_variant_from == 'vcf_file' and vcf_file_import_active:
- genome_build = request.form.get('genome')
- if 'file' not in request.files or genome_build is None:
- flash('You must specify the genome build and select a vcf file.', 'alert-danger')
- else:
- file = request.files['file']
- filename = file.filename
-
- if file.filename.strip() == '' or not functions.filename_allowed(file.filename, allowed_extensions = {"vcf", "txt"}):
- flash('No valid file selected.', 'alert-danger')
- else:
- filepath = functions.get_random_temp_file(fileending = "tsv", filename_ext = "import_vcf")
- with open(filepath, "w") as f: # file is deleted in task + we have to write to disk because filehandle can not be json serialized and thus, can not be given to a celery task
- f.write(file.read().decode("utf-8"))
- user_id = session["user"]["user_id"]
- user_roles = session["user"]["roles"]
- #inserted_variants, skipped_variants = variant_functions.insert_variants_vcf_file(vcf_file, genome_build, conn)
- import_queue_id = tasks.start_variant_import_vcf(user_id, user_roles, conn, filename, filepath, genome_build)
- flash("Successfully submitted vcf file. The import is processed in the background", "alert-success")
- do_redirect = True
+ user = conn.parse_raw_user(conn.get_user(session["user"]["user_id"]))
+ create_result, do_redirect = variant_functions.create_variant_from_request(request, user, conn)
+ if not create_result["flash_link"]:
+ flash_message = create_result["flash_message"]
+ else:
+ flash_message = {"message": create_result["flash_message"] + " view the variant ", "link": create_result["flash_link"]}
+ flash(flash_message, create_result["flash_class"])
if do_redirect:
return redirect(url_for('variant.create'))
- return render_template('variant/create.html', chrs=chroms, vcf_file_import_active=vcf_file_import_active)
+ return render_template('variant/create.html', chrs=chroms, vcf_file_import_active=current_app.config["VCF_FILE_IMPORT_ACTIVE"])
+