Skip to content

Commit

Permalink
added variant import history & bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MarvinDo committed Oct 6, 2023
1 parent 03f0eef commit 57658e8
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 76 deletions.
4 changes: 1 addition & 3 deletions src/annotation_service/heredicare_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ def get_vid_list(self, min_date = None):
message = "ERROR: HerediCare API get vid list endpoint returned an HTTP " + str(resp.status_code) + " error: " + resp.text
status = "error"
else: # request to heredicare was successful
print("SUCCESS")
json_content = resp.json()['items']

duplicate_vids = []
for vid_raw in json_content:
current_vid = vid_raw['record_id']
Expand Down Expand Up @@ -187,8 +187,6 @@ def convert_heredicare_variant_raw(self, raw_variant):
bearer, message = interface.get_bearer()
if bearer is None:
print(message)
print(bearer)
#bearer = "wc_dFAW61wZelMz_Ef7TjQ"

print("getting vid list")
all_vids_heredicare, message = interface.get_vid_list(bearer)
Expand Down
22 changes: 15 additions & 7 deletions src/annotation_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ def get_jobs(job_config):


def collect_error_msgs(msg1, msg2):
if len(msg1) > 0 and len(msg2) > 0:
res = msg1 + "\n~~\n" + msg2.strip()
elif len(msg2) > 0:
res = msg2.strip()
else:
res = msg1
res = msg1
if msg2 not in msg1:
if len(msg1) > 0 and len(msg2) > 0:
res = msg1 + "\n~~\n" + msg2.strip()
elif len(msg2) > 0:
res = msg2.strip()
else:
res = msg1
return res


Expand Down Expand Up @@ -164,10 +166,16 @@ def process_one_request(annotation_queue_id, job_config = get_default_job_config
execution_code_vcfcheck, err_msg_vcfcheck, vcf_errors = functions.check_vcf(vcf_path)
if execution_code_vcfcheck != 0:
status = "error"
err_msgs = collect_error_msgs(err_msgs, "VCFCheck errors: " + vcf_errors)
elif vcf_errors != '':
if 'MISO' not in vcf_errors: ### REMOVE LATER!!
err_msgs = collect_error_msgs(err_msgs, "VCFCheck errors: " + vcf_errors)
else:
err_msgs = collect_error_msgs(err_msgs, "VCFCheck error: MISO TERMS!")
else:
pass
print("VCF OK")
err_msgs = collect_error_msgs(err_msgs, vcf_errors)



############################################################
Expand Down
55 changes: 34 additions & 21 deletions src/common/db_IO.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,23 +1153,27 @@ def update_user_classification(self, user_classification_id, classification, com
self.cursor.execute(command, (str(classification), comment, date, str(scheme_class), user_classification_id))
self.conn.commit()

def delete_variant(self, variant_id):
status = "deleted"
message = "Deleted variant " + str(variant_id)
consensus_classification = self.get_consensus_classification(variant_id)
if len(consensus_classification) > 0: # do not delete if the variant has a consensus classification
status = "skipped"
message = "Did not delete variant because it has consensus classifications"
return status, message
user_classifications = self.get_user_classifications(variant_id)
if len(user_classifications) > 0: # do not delete if the variant has a user classification
status = "skipped"
message = "Did not delete variant because it has user classifications"
return status, message
command = "DELETE FROM variant WHERE id = %s"
self.cursor.execute(command, (variant_id,))
self.conn.commit()
return status, message
#def delete_variant(self, variant_id):
# status = "deleted"
# message = "Deleted variant " + str(variant_id)
# consensus_classification = self.get_consensus_classification(variant_id)
# if consensus_classification is None:
# consensus_classification = []
# if len(consensus_classification) > 0: # do not delete if the variant has a consensus classification
# status = "skipped"
# message = "Did not delete variant because it has consensus classifications"
# return status, message
# user_classifications = self.get_user_classifications(variant_id)
# if user_classifications is None:
# user_classifications = []
# if len(user_classifications) > 0: # do not delete if the variant has a user classification
# status = "skipped"
# message = "Did not delete variant because it has user classifications"
# return status, message
# command = "DELETE FROM variant WHERE id = %s"
# self.cursor.execute(command, (variant_id,))
# self.conn.commit()
# return status, message

#def get_orig_variant(self, variant_id):
# command = "SELECT orig_chr, orig_pos, orig_ref, orig_alt FROM variant WHERE id = %s"
Expand Down Expand Up @@ -1198,12 +1202,19 @@ def get_import_request(self, import_queue_id):
import_request_raw = self.cursor.fetchone()
import_request = self.convert_raw_import_request(import_request_raw)
return import_request

def get_import_request_overview(self):
command = "SELECT id, (SELECT first_name FROM user WHERE user_id = user.id) first_name, (SELECT last_name FROM user WHERE user_id = user.id) last_name, requested_at, status, finished_at, message FROM import_queue ORDER BY requested_at DESC"
self.cursor.execute(command)
res = self.cursor.fetchall()
return res



def get_max_finished_at_import_variant(self, import_queue_id):
command = "SELECT MAX(finished_at) FROM import_variant_queue WHERE import_queue_id = %s"
self.cursor.execute(command, (import_queue_id, ))
result = self.cursor.fetchone()
print(result)
return result[0]

def convert_raw_import_request(self, import_request_raw):
Expand All @@ -1224,9 +1235,11 @@ def convert_raw_import_request(self, import_request_raw):
#5. success: all variants are processed and
status = "unknown"
finished_at = None
if import_variant_list_status == "pending":
if import_variant_list_status == "retry":
status = "retry"
elif import_variant_list_status == "pending":
status = "pending"
elif import_variant_list_status == "processing":
elif import_variant_list_status == "progress":
status = "fetching vids"
elif import_variant_list_status == "success" and any([key_oi in variant_summary for key_oi in ["pending", "progress"]]):
status = "fetching variants"
Expand Down Expand Up @@ -1270,7 +1283,7 @@ def update_import_queue_celery_task_id(self, import_queue_id, celery_task_id):

def close_import_request(self, import_queue_id, status, message):
self.update_import_queue_status(import_queue_id, status, message)
command = "UPDATE import_queue SET finished_at = \"1999-01-01 00:00:00\" WHERE id = %s"
command = "UPDATE import_queue SET finished_at = NOW() WHERE id = %s" # \"1999-01-01 00:00:00\"
self.cursor.execute(command, (import_queue_id, ))
self.conn.commit()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ function update_page(url) {
// polling happens here:
// rerun in 5 seconds if state resembles an unfinished task
const import_status = data["import_request"]["status"]
if (import_status === "pending" || import_status === "fetching vids" || import_status === "fetching variants" || import_status === "unknown") {
if (import_status === "pending" || import_status === "fetching vids" || import_status === "fetching variants" || import_status === "unknown" || import_status === "retry") {

setTimeout(function() {
update_page(url);
Expand All @@ -47,6 +47,7 @@ function update_page(url) {

const status2meta = {
"unknown": {"color": "bg-secondary", "tooltip": "Please wait while the status is being fetched."},
"retry": {"color": "bg-secondary", "tooltip": "There was some unexpected error and the job is retrying now."},
"pending": {"color": "bg-secondary", "tooltip": "The job is queued and waiting for a worker to be picked up."},
"fetching vids": {"color": "bg-primary", "tooltip": "The vids are currently fetched from HerediCare."},
"fetching variants": {"color": "bg-primary", "tooltip": "The vids were fetched from HerediCare. The variants are currently imported to HerediVar."},
Expand Down Expand Up @@ -104,7 +105,6 @@ function update_variant_summary(summary_data) {
"success": "variant_summary_success",
"deleted": "variant_summary_deleted",
"retry": "variant_summary_retrying",
"skipped": "variant_summary_skipped",
"update": "variant_summary_update"
}
for (var key in all_tds) {
Expand Down
97 changes: 58 additions & 39 deletions src/frontend_celery/webapp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def start_variant_import(user_id, user_roles, conn: Connection): # starts the ce
import_request = conn.get_most_recent_import_request() # get the most recent import request
min_date = None
if import_request is not None:
min_date = import_request.finished_at
min_date = import_request.import_variant_list_finished_at
#print(import_request.finished_at)

new_import_request = conn.insert_import_request(user_id)
Expand All @@ -86,12 +86,26 @@ def heredicare_variant_import(self, user_id, user_roles, min_date, import_queue_
#from frontend_celery.webapp.utils.variant_importer import import_variants
self.update_state(state='PROGRESS')

conn = Connection(user_roles)
try:
conn = Connection(user_roles)

conn.update_import_queue_status(import_queue_id, status = "progress", message = "")
conn.update_import_queue_status(import_queue_id, status = "progress", message = "")

status, message = import_variants(conn, user_id, user_roles, functions.str2datetime(min_date, fmt = '%Y-%m-%dT%H:%M:%S'), import_queue_id)
except InternalError as e:
# deadlock: code 1213
status = "retry"
message = "Attempting retry because of database error: " + str(e) + ' ' + traceback.format_exc()
except Error as e:
status = "error"
message = "There was a database error: " + str(e) + ' ' + traceback.format_exc()
except HTTPError as e:
status = "retry"
message = "Attempting retry because of http error: " + str(e) + ' ' + traceback.format_exc()
except Exception as e:
status = "error"
message = "There was a runtime error: " + str(e) + ' ' + traceback.format_exc()


status, message = import_variants(conn, user_id, user_roles, functions.str2datetime(min_date, fmt = '%Y-%m-%dT%H:%M:%S'), import_queue_id)

if status != "retry":
conn.close_import_request(import_queue_id, status = status, message = message)
Expand Down Expand Up @@ -122,54 +136,63 @@ def import_variants(conn: Connection, user_id, user_roles, min_date, import_queu
status = "success"

vids_heredicare, status, message = heredicare_interface.get_vid_list(min_date)

if status == "success":
all_vids_heredicare, status, message = heredicare_interface.get_vid_list()

vids_heredivar = conn.get_all_external_ids("heredicare")
if status == "success":

vids_heredivar = conn.get_all_external_ids("heredicare")

intersection, heredivar_exclusive_vids, heredicare_exclusive_vids = compare_v_id_lists(vids_heredicare, vids_heredivar)

print("Intersection: " + str(len(intersection)))
print("Deleted: " + str(len(heredivar_exclusive_vids)))
print("New: " + str(len(heredicare_exclusive_vids)))

# spawn one task for each variant import
process_new_vids(heredicare_exclusive_vids + intersection, import_queue_id, user_id, user_roles, conn)
process_deleted_vids(heredivar_exclusive_vids, import_queue_id, user_id, user_roles, conn)

return status, message
intersection, heredivar_exclusive_vids, heredicare_exclusive_vids = compare_v_id_lists(all_vids_heredicare, vids_heredivar, vids_heredicare)

print("Total HerediCare: " + str(len(vids_heredicare)))

print("Intersection: " + str(len(intersection)))
print("Deleted: " + str(len(heredivar_exclusive_vids)))
print("New: " + str(len(heredicare_exclusive_vids)))

#intersection = []
#heredicare_exclusive_vids = [15524910]
#heredivar_exclusive_vids = []

# spawn one task for each variant import
process_new_vids(heredicare_exclusive_vids, import_queue_id, user_id, user_roles, conn)
process_new_vids(intersection, import_queue_id, user_id, user_roles, conn)
process_deleted_vids(heredivar_exclusive_vids, import_queue_id, user_id, user_roles, conn)

return status, message



def process_deleted_vids(vids, import_queue_id, user_id, user_roles, conn: Connection):
vids = []
for vid in vids:
_ = start_delete_variant(vid, import_queue_id, user_id, user_roles, conn)



def process_new_vids(vids, import_queue_id, user_id, user_roles, conn: Connection):
# simply insert them
vids = vids[:1000]
for vid in vids:
_ = start_import_one_variant(vid, import_queue_id, user_id, user_roles, conn)




def compare_v_id_lists(vids_heredicare, vids_heredivar):
vids_heredicare = set(vids_heredicare)
def compare_v_id_lists(all_vids_heredicare, vids_heredivar, vids_oi):
vids_oi = set(vids_oi)
vids_heredivar = set(vids_heredivar)
all_vids_heredicare = set(all_vids_heredicare)

intersection = all_vids_heredicare & vids_heredivar # known vids
heredivar_exclusive_variants = vids_heredivar - all_vids_heredicare # this contains variants which only have a vid in heredivar!!!!
heredicare_exclusive_variants = all_vids_heredicare - vids_heredivar # new vids

# filter for vids of interest
# do not filter deleted vids because they are not returned by the heredicare api
intersection = list(intersection & vids_oi)
heredivar_exclusive_variants = list(heredivar_exclusive_variants)
heredicare_exclusive_variants = list(heredicare_exclusive_variants & vids_oi)

intersection = list(vids_heredivar & vids_heredicare)
heredivar_exclusive_variants = list(vids_heredivar - vids_heredicare) # this contains only variants which have a vid in heredivar!!!!
heredicare_exclusive_variants = list(vids_heredicare - vids_heredivar)

return intersection, heredivar_exclusive_variants, heredicare_exclusive_variants

Expand Down Expand Up @@ -199,9 +222,11 @@ def delete_variant_heredicare(self, vid, user_id, user_roles, import_variant_que
#from frontend_celery.webapp.utils.variant_importer import fetch_heredicare
self.update_state(state='PROGRESS')

conn = Connection(user_roles)


try:
conn = Connection(user_roles)

conn.update_import_variant_queue_status(import_variant_queue_id, status = "progress", message = "")

message = "Removed heredicare vid"
Expand All @@ -210,7 +235,9 @@ def delete_variant_heredicare(self, vid, user_id, user_roles, import_variant_que
all_vids_for_variant = conn.get_external_ids_from_variant_id(variant_id, 'heredicare')
conn.delete_external_id(vid, 'heredicare', variant_id = variant_id)
if len(all_vids_for_variant) <= 1:
status, message = conn.delete_variant(variant_id) # this only deletes if there are no classifications
status = "deleted"
message = "Variant was hidden because it does not have any vids in heredicare anymore"
conn.hide_variant(variant_id, False)
except InternalError as e:
# deadlock: code 1213
status = "retry"
Expand All @@ -225,7 +252,7 @@ def delete_variant_heredicare(self, vid, user_id, user_roles, import_variant_que
status = "error"
message = "There was a runtime error: " + str(e) + ' ' + traceback.format_exc()

print(status)
#print(status)

if status != "retry":
conn.close_import_variant_request(import_variant_queue_id, status = status, message = message)
Expand Down Expand Up @@ -276,9 +303,8 @@ def import_one_variant_heredicare(self, vid, user_id, user_roles, import_variant
#from frontend_celery.webapp.utils.variant_importer import fetch_heredicare
self.update_state(state='PROGRESS')

conn = Connection(user_roles)

try:
conn = Connection(user_roles)
conn.update_import_variant_queue_status(import_variant_queue_id, status = "progress", message = "")
status, message = fetch_heredicare(vid, heredicare_interface, user_id, conn)
except InternalError as e:
Expand All @@ -295,7 +321,7 @@ def import_one_variant_heredicare(self, vid, user_id, user_roles, import_variant
status = "error"
message = "There was a runtime error: " + str(e) + ' ' + traceback.format_exc()

print(status)
#print(status)

if status != "retry":
conn.close_import_variant_request(import_variant_queue_id, status = status, message = message)
Expand Down Expand Up @@ -377,14 +403,6 @@ def fetch_heredicare(vid, heredicare_interface, user_id, conn:Connection): # the

was_successful, message, variant_id = validate_and_insert_variant(chrom, pos, ref, alt, genome_build, conn, user_id, allowed_sequence_letters = allowed_sequence_letters, perform_annotation=perform_annotation)

## vid was moved from one variant to another -> delete it from the existing variant
#if was_successful:
# # check that vid and variant refer to the same thing in heredicare and heredivar
# existing_variant_id = conn.get_variant_id_from_external_id(vid, "heredicare")
# if existing_variant_id is not None:
# if existing_variant_id != variant_id: # sanity check maybe not neccessary
# start_delete_variant(vid, import_queue_id, user_id, user_roles, conn) # deletes the vid and the variant if


if variant_id is not None: # insert new vid
conn.insert_external_variant_id(variant_id, vid, "heredicare")
Expand Down Expand Up @@ -488,6 +506,7 @@ def validate_and_insert_variant(chrom, pos, ref, alt, genome_build, conn: Connec
else:
variant_id = conn.get_variant_id(new_chr, new_pos, new_ref, new_alt)
message = "Variant not imported: already in database!!"
conn.hide_variant(variant_id, True)
was_successful = False
if perform_annotation:
celery_task_id = start_annotation_service(conn, user_id, variant_id) # starts the celery background task
Expand Down
2 changes: 2 additions & 0 deletions src/frontend_celery/webapp/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ <h4>Changelog</h4>
<li>Added variant import from HerediCare</li>
<li>Added HerediCare number of families and heredicare number of cases annotations</li>
<li>Improved criteria popover</li>
<li>Added variant import history page</li>
<li>Enabled HerediCare annotations in admin dashboard</li>
</ul>
Bugfixes:
<ul>
Expand Down
Loading

0 comments on commit 57658e8

Please sign in to comment.